Redis 支持
Redis 支持
Spring Integration 2.1 引入了对 Redis 的支持:“开源高级键值存储”。这种支持以基于 Redis 的形式出现MessageStore以及 Redis 通过其PUBLISH,SUBSCRIBE和UNSUBSCRIBE命令。
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>6.0.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-redis:6.0.9"
您还需要包含 Redis 客户端依赖项,例如 Lettuce。
若要下载、安装和运行 Redis,请参阅 Redis 文档。
连接到 Redis
要开始与 Redis 交互,您首先需要连接到它。Spring Integration 使用另一个 Spring 项目 Spring Data Redis 提供的支持,它提供了典型的 Spring 结构:ConnectionFactory和Template. 这些抽象简化了与多个 Redis 客户端 Java API 的集成。目前,Spring Data Redis 支持 Jedis 和 Lettuce。
用RedisConnectionFactory
要连接到 Redis,您可以使用RedisConnectionFactory接口。 以下列表显示了接口定义:
public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
/**
* Provides a suitable connection for interacting with Redis.
* @return connection for interacting with Redis.
*/
RedisConnection getConnection();
}
以下示例演示如何创建LettuceConnectionFactory在 Java 中:
LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();
以下示例演示如何创建LettuceConnectionFactory在 Spring 的 XML 配置中:
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
的实现RedisConnectionFactory提供一组属性,例如端口和主机,您可以根据需要进行设置。一旦您拥有RedisConnectionFactory,您可以创建RedisTemplate并注入RedisConnectionFactory.
用RedisTemplate
与 Spring 中的其他模板类(例如JdbcTemplate和JmsTemplate) RedisTemplate是一个简化 Redis 数据访问代码的帮助程序类。有关RedisTemplate及其变体(例如StringRedisTemplate)请参阅 Spring Data Redis 文档。
以下示例展示了如何创建RedisTemplate在 Java 中:
RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);
以下示例展示了如何创建RedisTemplate在 Spring 的 XML 配置中:
<bean id="redisTemplate"
class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
使用 Redis 发送消息
正如简介中提到的,Redis 通过其PUBLISH,SUBSCRIBE和UNSUBSCRIBE命令。 与 JMS 和 AMQP 一样,Spring Integration 提供了消息通道和适配器,用于通过 Redis 发送和接收消息。
Redis 发布/订阅通道
与 JMS 类似,在某些情况下,生产者和消费者都打算成为同一应用程序的一部分,在同一进程中运行。您可以通过使用一对入站和出站通道适配器来实现此目的。但是,与 Spring Integration 的 JMS 支持一样,有一种更简单的方法来解决此用例。您可以创建一个发布-订阅通道,如以下示例所示:
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>
一个publish-subscribe-channel行为很像正常人<publish-subscribe-channel/>元素。它可以被input-channel和output-channel任何端点的属性。不同之处在于,此通道由 Redis 主题名称支持:一个String由topic-name属性。 但是,与 JMS 不同的是,此主题不必提前创建,甚至不必由 Redis 自动创建。在 Redis 中,主题很简单String值,这些值都扮演着地址的角色。生产者和消费者可以使用相同的Stringvalue 作为其主题名称。对此通道的简单订阅意味着在生产端点和使用端点之间可以进行异步发布-订阅消息传递。但是,与通过添加<queue/>元素在简单的 Spring Integration 中<channel/>元素,消息不会存储在内存队列中。相反,这些消息通过 Redis 传递,这使您可以依赖它对持久性和集群的支持以及它与其他非 Java 平台的互作性。
Redis 入站通道适配器
Redis 入站通道适配器 (RedisInboundChannelAdapter)将传入的 Redis 消息调整为 Spring 消息,其方式与其他入站适配器相同。它接收特定于平台的消息(在本例中为 Redis),并使用MessageConverter策略。 以下示例演示如何配置 Redis 入站通道适配器:
<int-redis:inbound-channel-adapter id="redisAdapter"
topics="thing1, thing2"
channel="receiveChannel"
error-channel="testErrorChannel"
message-converter="testConverter" />
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
前面的示例显示了 Redis 入站通道适配器的简单但完整的配置。请注意,前面的配置依赖于熟悉的自动发现某些 Bean 的 Spring 范例。在这种情况下,redisConnectionFactory隐式注入到适配器中。您可以使用connection-factory属性。
另请注意,前面的配置向适配器注入了自定义MessageConverter. 该方法类似于 JMS,其中MessageConverter实例用于在 Redis 消息和 Spring Integration 消息有效负载之间进行转换。默认值是SimpleMessageConverter.
入站适配器可以订阅多个主题名称,因此topics属性。
从 3.0 版本开始,入站适配器除了现有的topics属性,现在具有topic-patterns属性。 此属性包含一组以逗号分隔的 Redis 主题模式。有关 Redis 发布-订阅的更多信息,请参阅 Redis 发布/订阅。
入站适配器可以使用RedisSerializer反序列化 Redis 消息的正文。 这serializer属性的<int-redis:inbound-channel-adapter>可以设置为空字符串,这会导致null值RedisSerializer财产。 在这种情况下,原始byte[]Redis 消息的正文作为消息有效负载提供。
从 5.0 版本开始,您可以提供Executor实例添加到入站适配器,方法是使用task-executor属性的<int-redis:inbound-channel-adapter>. 此外,收到的 Spring Integration 消息现在具有RedisHeaders.MESSAGE_SOURCE标头来指示已发布消息的来源:主题或模式。您可以将此下游用于路由逻辑。
Redis 出站通道适配器
Redis 出站通道适配器以与其他出站适配器相同的方式将传出 Spring Integration 消息调整为 Redis 消息。它接收 Spring Integration 消息,并使用MessageConverter策略。 以下示例演示如何配置 Redis 出站通道适配器:
<int-redis:outbound-channel-adapter id="outboundAdapter"
channel="sendChannel"
topic="thing1"
message-converter="testConverter"/>
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379"/>
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
该配置与 Redis 入站通道适配器并行。适配器隐式注入了RedisConnectionFactory,其定义为redisConnectionFactory作为其 bean 名称。此示例还包括可选(和自定义)MessageConverter(这testConverter豆子)。
自 Spring Integration 3.0 以来,<int-redis:outbound-channel-adapter>提供了topic属性:您可以使用topic-expression属性来确定运行时消息的 Redis 主题。这些属性是互斥的。
Redis 队列入站通道适配器
Spring Integration 3.0 引入了一个队列入站通道适配器来“弹出”来自 Redis 列表的消息。默认情况下,它使用“右弹出”,但您可以将其配置为使用“左弹出”。适配器是消息驱动的。它使用内部侦听器线程,不使用轮询器。
以下列表显示了queue-inbound-channel-adapter:
<int-redis:queue-inbound-channel-adapter id="" (1)
channel="" (2)
auto-startup="" (3)
phase="" (4)
connection-factory="" (5)
queue="" (6)
error-channel="" (7)
serializer="" (8)
receive-timeout="" (9)
recovery-interval="" (10)
expect-message="" (11)
task-executor="" (12)
right-pop=""/> (13)
| 1 | 组件 bean 名称。如果您没有提供channel属性,一个DirectChannel在应用程序上下文中创建和注册,并使用此id属性作为 bean 名称。在这种情况下,端点本身使用 bean 名称注册id加.adapter. (如果 bean 名称为thing1,则终结点注册为thing1.adapter.) |
| 2 | 这MessageChannel发送到哪个Message实例。 |
| 3 | 一个SmartLifecycle属性来指定此终结点是否应在应用程序上下文启动后自动启动。它默认为true. |
| 4 | 一个SmartLifecycle属性来指定启动此端点的阶段。它默认为0. |
| 5 | 对RedisConnectionFactory豆。 它默认为redisConnectionFactory. |
| 6 | 执行基于队列的“pop”作以获取 Redis 消息的 Redis 列表的名称。 |
| 7 | 这MessageChannel发送到哪个ErrorMessage从终结点的侦听任务收到异常时的实例。默认情况下,基础MessagePublishingErrorHandler使用默认的errorChannel从应用程序上下文中。 |
| 8 | 这RedisSerializerbean 引用。
它可以是一个空字符串,这意味着“没有序列化程序”。
在这种情况下,原始byte[]从入站 Redis 消息发送到channel作为Message有效载荷。
默认情况下,它是JdkSerializationRedisSerializer. |
| 9 | “pop”作等待来自队列的 Redis 消息的超时时间(以毫秒为单位)。 默认值为 1 秒。 |
| 10 | 侦听器任务在重新启动侦听器任务之前,在“pop”作出现异常后应休眠的时间(以毫秒为单位)。 |
| 11 | 指定此端点是否期望 Redis 队列中的数据包含整个Message实例。
如果此属性设置为true这serializer不能是空字符串,因为消息需要某种形式的反序列化(默认情况下是 JDK 序列化)。
其默认值为false. |
| 12 | 对弹簧的引用TaskExecutor(或标准 JDK 1.5+Executor)豆。
它用于底层侦听任务。
它默认为SimpleAsyncTaskExecutor. |
| 13 | 指定此端点是否应使用“right pop”(当true) 或“左弹出”(当false) 读取 Redis 列表中的消息。
如果true,Redis 列表充当FIFO与默认 Redis 队列出站通道适配器一起使用时的队列。
将其设置为false与通过“右推”写入列表的软件一起使用,或实现类似堆栈的消息顺序。
其默认值为true.
从 4.3 版本开始。 |
这task-executor必须配置多个线程进行处理;否则,当RedisQueueMessageDrivenEndpoint尝试在错误后重新启动侦听器任务。
这errorChannel可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露在可能的死锁情况下。
请参阅 Spring Framework 参考手册 了解可能的情况TaskExecutor实现。 |
Redis 队列出站通道适配器
Spring Integration 3.0 引入了一个队列出站通道适配器,用于从 Spring Integration 消息“推送”到 Redis 列表。
默认情况下,它使用“左推”,但您可以将其配置为使用“右推”。
以下列表显示了 Redis 的所有可用属性queue-outbound-channel-adapter:
<int-redis:queue-outbound-channel-adapter id="" (1)
channel="" (2)
connection-factory="" (3)
queue="" (4)
queue-expression="" (5)
serializer="" (6)
extract-payload="" (7)
left-push=""/> (8)
| 1 | 组件 bean 名称。如果您没有提供channel属性,一个DirectChannel在应用程序上下文中创建和注册,并使用此id属性作为 bean 名称。
在这种情况下,端点注册的 bean 名称为id加.adapter. (如果 bean 名称为thing1,则终结点注册为thing1.adapter.) |
| 2 | 这MessageChannel此端点从中接收Message实例。 |
| 3 | 对RedisConnectionFactory豆。 它默认为redisConnectionFactory. |
| 4 | 执行基于队列的“推送”作以发送 Redis 消息的 Redis 列表的名称。
此属性与queue-expression. |
| 5 | 一个 SpELExpression以确定 Redis 列表的名称。
它使用传入的Message在运行时作为#root变量。
此属性与queue. |
| 6 | 一个RedisSerializerbean 引用。
它默认为JdkSerializationRedisSerializer.
但是,对于String有效载荷,一个StringRedisSerializer,如果serializer未提供参考。 |
| 7 | 指定此端点是应仅发送有效负载还是整个Message到 Redis 队列。
它默认为true. |
| 8 | 指定此端点是否应使用“左推”(当true) 或“右推”(当false) 将消息写入 Redis 列表。
如果true,Redis 列表充当FIFO队列,当与默认的 Redis 队列入站通道适配器一起使用时。
将其设置为false与使用“左弹出”从列表中读取的软件一起使用,或实现类似堆栈的消息顺序。
它默认为true.
从 4.3 版本开始。 |
Redis 应用程序事件
从 Spring Integration 3.0 开始,Redis 模块提供了IntegrationEvent,这反过来又是一个org.springframework.context.ApplicationEvent.
这RedisExceptionEvent封装 Redis作的异常(端点是事件的“源”)。
例如,<int-redis:queue-inbound-channel-adapter/>在从BoundListOperations.rightPop操作。
例外可以是任何通用的org.springframework.data.redis.RedisSystemException或org.springframework.data.redis.RedisConnectionFailureException.
使用<int-event:inbound-channel-adapter/>可用于确定后台 Redis 任务的问题并采取管理作。
Redis 消息存储
如《企业集成模式 (EIP)》一书中所述,消息存储允许您持久化消息。
当可靠性受到关注时,这在处理具有缓冲消息功能的组件(聚合器、重排序器等)时非常有用。
在 Spring Integration 中,MessageStore策略还为声明检查模式提供了基础,这在 EIP 中也有描述。
Spring Integration 的 Redis 模块提供了RedisMessageStore.
以下示例演示如何将其与聚合器一起使用:
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="redisMessageStore"/>
前面的示例是一个 bean 配置,它需要一个RedisConnectionFactory作为构造函数参数。
默认情况下,RedisMessageStore使用 Java 序列化来序列化消息。
但是,如果您想使用不同的序列化技术(例如 JSON),您可以通过将valueSerializer属性的RedisMessageStore.
从 4.3.10 版开始,框架为Messageinstances 和MessageHeaders实例 —MessageJacksonDeserializer和MessageHeadersJacksonSerializer分别。
它们必须配置为SimpleModule选项ObjectMapper.
此外,您应该将enableDefaultTyping在ObjectMapper为每个序列化的复杂对象添加类型信息(如果您信任源)。
然后,在反序列化期间使用该类型信息。
该框架提供了一个名为JacksonJsonUtils.messagingAwareMapper(),它已经提供了前面提到的所有属性和序列化器。
此实用程序方法附带了trustedPackages参数来限制 Java 包进行反序列化以避免安全漏洞。
默认的受信任包:java.util,java.lang,org.springframework.messaging.support,org.springframework.integration.support,org.springframework.integration.message,org.springframework.integration.store.
要在RedisMessageStore,您必须以类似于以下示例的方式对其进行配置:
RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);
从 4.3.12 版本开始,RedisMessageStore支持prefix选项,以允许区分同一 Redis 服务器上的存储实例。
Redis 通道消息存储
这RedisMessageStore 前面显示的将每个组维护为单个键(组 ID)下的值。
虽然您可以使用它来支持QueueChannel对于持久性,专门的RedisChannelMessageStore是为此目的提供的(自 4.0 版起)。
此商店使用LIST对于每个通道,LPUSH发送消息时,以及RPOP接收消息时。
默认情况下,此存储也使用 JDK 序列化,但您可以修改值序列化程序,如前所述。
建议使用此商店支持渠道,而不是使用常规RedisMessageStore.
以下示例定义了 Redis 消息存储,并在具有队列的通道中使用它:
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="redisMessageStore"/>
<int:channel>
用于存储数据的键具有以下形式:<storeBeanName>:<channelId>(在前面的示例中,redisMessageStore:somePersistentQueueChannel).
此外,一个子类RedisChannelPriorityMessageStore还提供。
当您将其与QueueChannel,则按 (FIFO) 优先级顺序接收消息。
它使用标准IntegrationMessageHeaderAccessor.PRIORITY标头,并支持优先级值 (0 - 9).
具有其他优先级的消息(以及没有优先级的消息)将按照 FIFO 顺序在任何具有优先级的消息之后检索。
这些商店仅实现BasicMessageGroupStore并且不实现MessageGroupStore.
它们只能用于支持等情况QueueChannel. |
Redis 元数据存储
Spring Integration 3.0 引入了一个新的基于 RedisMetadataStore(请参阅元数据存储)实现。
您可以使用RedisMetadataStore以保持MetadataStore跨应用程序重启。
您可以使用这个新的MetadataStore使用适配器实现,例如:
要指示这些适配器使用新的RedisMetadataStore,声明一个名为metadataStore.
Feed 入站通道适配器和 Feed 入站通道适配器都会自动选取并使用声明的RedisMetadataStore.
以下示例显示了如何声明这样的 bean:
<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
<constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
这RedisMetadataStore由RedisProperties.
与它的交互使用BoundHashOperations,这反过来又需要一个key对于整个Properties商店。
在MetadataStore这key扮演区域的角色,这在分布式环境中很有用,当多个应用程序使用同一 Redis 服务器时。
默认情况下,此key其值为MetaData.
从 4.0 版开始,此存储实现ConcurrentMetadataStore,使其在多个应用程序实例之间可靠地共享,其中只允许一个实例存储或修改键的值。
您不能使用RedisMetadataStore.replace()(例如,在AbstractPersistentAcceptOnceFileListFilter)替换为 Redis 集群,因为WATCH当前不支持原子性命令。 |
Redis Store 入站通道适配器
Redis 存储入站通道适配器是一个轮询使用者,它从 Redis 集合中读取数据并将其作为Message有效载荷。
以下示例演示如何配置 Redis 存储入站通道适配器:
<int-redis:store-inbound-channel-adapter id="listAdapter"
connection-factory="redisConnectionFactory"
key="myCollection"
channel="redisChannel"
collection-type="LIST" >
<int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>
前面的示例演示如何使用store-inbound-channel-adapter元素,为各种属性提供值,例如:
-
key或key-expression:正在使用的集合的密钥的名称。 -
collection-type:此适配器支持的集合类型的枚举。 支持的集合包括LIST,SET,ZSET,PROPERTIES和MAP. -
connection-factory:对o.s.data.redis.connection.RedisConnectionFactory. -
redis-template:对o.s.data.redis.core.RedisTemplate. -
所有其他入站适配器中通用的其他属性(例如“channel”)。
不能同时设置两者redis-template和connection-factory. |
|
默认情况下,适配器使用
这 |
因为它具有key,前面的示例相对简单且静态。
有时,您可能需要根据某些条件在运行时更改键的值。
为此,请使用key-expression相反,提供的表达式可以是任何有效的 SpEL 表达式。
此外,您可能希望对从 Redis 集合中读取的成功处理的数据执行一些后处理。
例如,您可能希望在处理值后移动或删除该值。
您可以使用 Spring Integration 2.2 中添加的事务同步功能来执行此作。
以下示例使用key-expression和事务同步:
<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
connection-factory="redisConnectionFactory"
key-expression="'presidents'"
channel="otherRedisChannel"
auto-startup="false"
collection-type="ZSET">
<int:poller fixed-rate="1000" max-messages-per-poll="2">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-redis:store-inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
您可以使用transactional元素。
此元素可以引用真实的事务管理器(例如,如果流的其他部分调用 JDBC)。
如果您没有“真实”交易,您可以使用o.s.i.transaction.PseudoTransactionManager,这是 Spring 的PlatformTransactionManager并在没有实际事务时启用 Redis 适配器的事务同步功能。
| 这不会使 Redis 活动本身成为事务性的。它允许在成功(提交)或失败(回滚)之前或之后执行作同步。 |
一旦轮询器是事务性的,您就可以设置o.s.i.transaction.TransactionSynchronizationFactory在transactional元素。TransactionSynchronizationFactory创建TransactionSynchronization. 为了您的方便,我们公开了一个默认的基于 SpELTransactionSynchronizationFactory,它允许您配置 SpEL 表达式,并使其执行与事务协调(同步)。支持提交前、提交后和回滚后表达式,以及发送评估结果(如果有)的通道(每种事件一个)。对于每个子元素,您可以指定expression和channel属性。 如果只有channel属性存在,则接收到的消息将作为特定同步场景的一部分发送到那里。如果只有expression属性存在,并且表达式的结果是非空值,则生成一条消息,其结果为有效负载并发送到默认通道(NullChannel)并出现在日志中(在DEBUG级别)。如果希望评估结果转到特定通道,请添加channel属性。 如果表达式的结果为 null 或 void,则不会生成任何消息。
有关事务同步的详细信息,请参阅事务同步。
RedisStore 出站通道适配器
RedisStore 出站通道适配器允许将消息有效负载写入 Redis 集合,如以下示例所示:
<int-redis:store-outbound-channel-adapter id="redisListAdapter"
collection-type="LIST"
channel="requestChannel"
key="myCollection" />
前面的配置是 Redis 存储出站通道适配器,方法是使用store-inbound-channel-adapter元素。 它为各种属性提供值,例如:
-
key或key-expression:正在使用的集合的密钥的名称。 -
extract-payload-elements:如果设置为true(默认值),并且有效负载是“多值”对象的实例(即Collection或Map),它通过使用“addAll”和“putAll”语义进行存储。 否则,如果设置为false,则有效负载存储为单个条目,无论其类型如何。 如果有效负载不是“多值”对象的实例,则忽略此属性的值,并且有效负载始终存储为单个条目。 -
collection-type:枚举Collection此适配器支持的类型。 支持的集合包括LIST,SET,ZSET,PROPERTIES和MAP. -
map-key-expression:返回要存储的条目的键名称的 SpEL 表达式。 它仅适用于collection-type是MAP或PROPERTIES并且 'extract-payload-elements' 为 false。 -
connection-factory:对o.s.data.redis.connection.RedisConnectionFactory. -
redis-template:对o.s.data.redis.core.RedisTemplate. -
所有其他入站适配器中通用的其他属性(例如“channel”)。
不能同时设置两者redis-template和connection-factory. |
默认情况下,适配器使用StringRedisTemplate.
这使用StringRedisSerializer键、值、哈希键和哈希值的实例。
但是,如果extract-payload-elements设置为false一个RedisTemplate那有StringRedisSerializer实例,用于键和哈希键,以及JdkSerializationRedisSerializer将使用值和哈希值的实例 s。
对于 JDK 序列化程序,重要的是要了解 Java 序列化用于所有值,无论该值是否实际上是集合。
如果您需要对值的序列化进行更多控制,请考虑提供您自己的RedisTemplate而不是依赖这些默认值。 |
因为它具有key等属性,前面的例子比较简单和静态。
有时,您可能需要在运行时根据某些条件动态更改值。
为此,请使用其-expression等效 (key-expression,map-key-expression,依此类推),其中提供的表达式可以是任何有效的 SpEL 表达式。
Redis 出站命令网关
Spring Integration 4.0 引入了 Redis 命令网关,允许您使用通用RedisConnection#execute方法。
以下列表显示了 Redis 出站网关的可用属性:
<int-redis:outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
redis-template="" (6)
arguments-serializer="" (7)
command-expression="" (8)
argument-expressions="" (9)
use-command-variable="" (10)
arguments-strategy="" /> (11)
| 1 | 这MessageChannel此端点从中接收Message实例。 |
| 2 | 这MessageChannel此终结点发送回复的位置Message实例。 |
| 3 | 指定此出站网关是否必须返回非空值。
它默认为true.
一个ReplyRequiredException当 Redis 返回null价值。 |
| 4 | 等待回复消息发送的超时(以毫秒为单位)。 它通常适用于基于队列的有限应答通道。 |
| 5 | 对RedisConnectionFactory豆。 它默认为redisConnectionFactory.
它与 'redis-template' 属性是互斥的。 |
| 6 | 对RedisTemplate豆。
它与“连接工厂”属性互斥。 |
| 7 | 对org.springframework.data.redis.serializer.RedisSerializer.
如有必要,它用于将每个命令参数序列化为 byte[]。 |
| 8 | 返回命令键的 SpEL 表达式。
它默认为redis_command消息头。
它不得计算为null. |
| 9 | 计算为命令参数的逗号分隔的 SpEL 表达式。
与arguments-strategy属性。
如果您既不提供这两个属性,则payload用作命令参数。
参数表达式可以计算为“null”以支持可变数量的参数。 |
| 10 | 一个boolean标志来指定评估的 Redis 命令字符串是否用作#cmd变量在表达式评估上下文中的o.s.i.redis.outbound.ExpressionArgumentsStrategy什么时候argument-expressions已配置。
否则,将忽略此属性。 |
| 11 | 引用o.s.i.redis.outbound.ArgumentsStrategy.
它与argument-expressions属性。
如果您既不提供这两个属性,则payload用作命令参数。 |
您可以使用<int-redis:outbound-gateway>作为执行任何所需 Redis作的通用组件。
以下示例显示了如何从 Redis 原子序数获取递增值:
<int-redis:outbound-gateway request-channel="requestChannel"
reply-channel="replyChannel"
command-expression="'INCR'"/>
这Message有效负载的名称应为redisCounter,可由org.springframework.data.redis.support.atomic.RedisAtomicIntegerbean 定义。
这RedisConnection#execute方法具有泛型Object作为其返回类型。
实际结果取决于命令类型。
例如MGET返回一个List<byte[]>.
有关命令、其参数和结果类型的详细信息,请参阅 Redis 规范。
Redis 队列出站网关
Spring Integration 引入了 Redis 队列出站网关来执行请求和回复场景。
它推动对话UUID到提供的queue,用它推送值UUID作为 Redis 列表的键,并等待来自键为UUID' plus '.reply.
每次交互使用不同的 UUID。
以下列表显示了 Redis 出站网关的可用属性:
<int-redis:queue-outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
extract-payload=""/> (9)
| 1 | 这MessageChannel此端点从中接收Message实例。 |
| 2 | 这MessageChannel此终结点发送回复的位置Message实例。 |
| 3 | 指定此出站网关是否必须返回非空值。
此值为false默认情况下。
否则,一个ReplyRequiredException当 Redis 返回null价值。 |
| 4 | 等待回复消息发送的超时(以毫秒为单位)。 它通常适用于基于队列的有限应答通道。 |
| 5 | 对RedisConnectionFactory豆。 它默认为redisConnectionFactory.
它与 'redis-template' 属性是互斥的。 |
| 6 | 出站网关向其发送对话的 Redis 列表的名称UUID. |
| 7 | 注册多个网关时此出站网关的顺序。 |
| 8 | 这RedisSerializerbean 引用。
它可以是一个空字符串,表示“没有序列化程序”。
在这种情况下,原始byte[]从入站 Redis 消息发送到channel作为Message有效载荷。
默认情况下,它是JdkSerializationRedisSerializer. |
| 9 | 指定此端点是否期望 Redis 队列中的数据包含整个Message实例。
如果此属性设置为true这serializer不能是空字符串,因为消息需要某种形式的反序列化(默认情况下是 JDK 序列化)。 |
Redis 队列入站网关
Spring Integration 4.1 引入了 Redis 队列入站网关来执行请求和回复场景。
它弹出对话UUID从提供的queue,弹出带有该值的值UUID作为 Redis 列表中的键,并将回复推送到 Redis 列表,键为UUID加.reply.
以下列表显示了 Redis 队列入站网关的可用属性:
<int-redis:queue-inbound-gateway
request-channel="" (1)
reply-channel="" (2)
executor="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
receive-timeout="" (9)
expect-message="" (10)
recovery-interval=""/> (11)
| 1 | 这MessageChannel此终结点发送的位置Message从 Redis 数据创建的实例。 |
| 2 | 这MessageChannel此终结点从何处等待回复Message实例。
可选 -replyChannelheader 仍在使用中。 |
| 3 | 对弹簧的引用TaskExecutor(或标准 JDKExecutor)豆。
它用于底层侦听任务。
它默认为SimpleAsyncTaskExecutor. |
| 4 | 等待回复消息发送的超时(以毫秒为单位)。 它通常适用于基于队列的有限应答通道。 |
| 5 | 对RedisConnectionFactory豆。 它默认为redisConnectionFactory.
它与 'redis-template' 属性是互斥的。 |
| 6 | 对话的 Redis 列表的名称UUID. |
| 7 | 注册多个网关时此入站网关的顺序。 |
| 8 | 这RedisSerializerbean 引用。
它可以是一个空字符串,表示“没有序列化程序”。
在这种情况下,原始byte[]从入站 Redis 消息发送到channel作为Message有效载荷。
它默认为JdkSerializationRedisSerializer.
(请注意,在 4.3 版本之前的版本中,它是StringRedisSerializer默认情况下。
要恢复该行为,请提供对StringRedisSerializer). |
| 9 | 等待获取接收消息的超时(以毫秒为单位)。 它通常适用于基于队列的有限请求通道。 |
| 10 | 指定此端点是否期望 Redis 队列中的数据包含整个Message实例。
如果此属性设置为true这serializer不能是空字符串,因为消息需要某种形式的反序列化(默认情况下是 JDK 序列化)。 |
| 11 | 侦听器任务在重新启动侦听器任务之前,在“右弹出”作出现异常后应休眠的时间(以毫秒为单位)。 |
这task-executor必须配置多个线程进行处理;否则,当RedisQueueMessageDrivenEndpoint尝试在错误后重新启动侦听器任务。
这errorChannel可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露在可能的死锁情况下。
请参阅 Spring Framework 参考手册 了解可能的情况TaskExecutor实现。 |
Redis Stream 出站通道适配器
Spring Integration 5.4 引入了 Reactive Redis Stream 出站通道适配器,以将消息有效负载写入 Redis 流。出站通道适配器使用ReactiveStreamOperations.add(…)添加一个Record到流。以下示例演示了如何使用 Redis 流出站通道适配器的 Java 配置和服务类。
@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1)
reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2)
reactiveStreamMessageHandler.setHashMapper(hashMapper); (3)
reactiveStreamMessageHandler.setExtractPayload(true); (4)
return reactiveStreamMessageHandler;
}
| 1 | 构造ReactiveRedisStreamMessageHandler用ReactiveRedisConnectionFactory和流名称以添加记录。另一个构造函数变体基于 SpEL 表达式,用于根据请求消息评估流键。 |
| 2 | 设置一个RedisSerializationContext用于在添加到流之前序列化记录键和值。 |
| 3 | 设置HashMapper它提供 Java 类型和 Redis 哈希/映射之间的契约。 |
| 4 | 如果为“true”,则通道适配器将从请求消息中提取有效负载,以便添加流记录。或者将整个消息用作值。它默认为true. |
Redis Stream 入站通道适配器
Spring Integration 5.4 引入了 Reactive Stream 入站通道适配器,用于从 Redis Stream 读取消息。入站通道适配器使用StreamReceiver.receive(…)或StreamReceiver.receiveAutoAck()基于自动确认标志从 Redis 流读取记录。以下示例展示了如何使用 Redis 流入站通道适配器的 Java 配置。
@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1)
messageProducer.setStreamReceiverOptions( (2)
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.build());
messageProducer.setAutoStartup(true); (3)
messageProducer.setAutoAck(false); (4)
messageProducer.setCreateConsumerGroup(true); (5)
messageProducer.setConsumerGroup("my-group"); (6)
messageProducer.setConsumerName("my-consumer"); (7)
messageProducer.setOutputChannel(fromRedisStreamChannel); (8)
messageProducer.setReadOffset(ReadOffset.latest()); (9)
messageProducer.extractPayload(true); (10)
return messageProducer;
}
| 1 | 构造ReactiveRedisStreamMessageProducer用ReactiveRedisConnectionFactory和流键读取记录。 |
| 2 | 一个StreamReceiver.StreamReceiverOptions使用响应式基础设施使用 Redis Stream。 |
| 3 | 一个SmartLifecycle属性来指定此终结点是否应在应用程序上下文启动后自动启动。它默认为true. 如果false,RedisStreamMessageProducer应手动启动messageProducer.start(). |
| 4 | 如果false,则不会自动确认收到的消息。消息的确认将推迟到客户端使用消息。默认为true. |
| 5 | 如果true,将创建一个消费者组。在创建消费者组流的过程中,也会创建(如果尚不存在)。消费者组跟踪消息传递并区分消费者。它默认为false. |
| 6 | 设置使用者组名称。它默认为定义的 bean 名称。 |
| 7 | 设置使用者名称。将消息读取为my-consumer从组my-group. |
| 8 | 要从此终结点向其发送消息的消息通道。 |
| 9 | 定义要读取消息的偏移量。它默认为ReadOffset.latest(). |
| 10 | 如果为“true”,则通道适配器将从Record. 否则,整体Record用作有效负载。它默认为true. |
如果autoAck设置为false这Record在 Redis Stream 中,Redis 驱动程序不会自动确认,而是IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACKheader 添加到消息中,以生成SimpleAcknowledgmentinstance 作为值。
调用其acknowledge()每当根据此类记录为消息完成业务逻辑时,都会回调。
即使在反序列化期间发生异常时,也需要类似的逻辑,并且errorChannel已配置。
因此,目标错误处理程序必须决定确认或 nack 此类失败消息。
与IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK这ReactiveRedisStreamMessageProducer还会将这些标头填充到消息中以生成:RedisHeaders.STREAM_KEY,RedisHeaders.STREAM_MESSAGE_ID,RedisHeaders.CONSUMER_GROUP和RedisHeaders.CONSUMER.
从 5.5 版开始,您可以配置StreamReceiver.StreamReceiverOptionsBuilder选项显式地在ReactiveRedisStreamMessageProducer,包括新推出的onErrorResume函数,如果 Redis Stream 使用者在发生反序列化错误时应继续轮询,则需要该函数。
默认函数向错误通道(如果提供)发送一条消息,并可能确认失败的消息,如上所述。
所有这些StreamReceiver.StreamReceiverOptionsBuilder与外部提供的StreamReceiver.StreamReceiverOptions.
Redis 锁注册表
Spring Integration 4.0 引入了RedisLockRegistry.
某些组件(例如,聚合器和重排序器)使用从LockRegistry实例,以确保一次只有一个线程作一个组。
这DefaultLockRegistry在单个组件中执行此功能。
现在可以在这些组件上配置外部锁注册表。
当您将其与共享的MessageGroupStore,您可以使用RedisLockRegistry跨多个应用程序实例提供此功能,以便一次只有一个实例可以作该组。
当本地线程释放锁时,另一个本地线程通常可以立即获取锁。 如果线程使用其他注册表实例释放锁,则获取锁最多可能需要 100 毫秒。
为避免“挂起”锁(当服务器发生故障时),此注册表中的锁在默认 60 秒后过期,但您可以在注册表上配置此值。 锁的持有时间通常要短得多。
| 由于密钥可能会过期,因此尝试解锁过期的锁会导致引发异常。 但是,受此类锁保护的资源可能已被泄露,因此应认为此类例外情况是严重的。 应将到期时间设置为足够大的值以防止这种情况,但将其设置为足够低的值,以便在服务器发生故障后可以在合理的时间内恢复锁。 |
从 5.0 版开始,RedisLockRegistry实现ExpirableLockRegistry,它删除上次获取的锁超过age之前,并且当前未锁定。
String 版本为 5.5.6,则RedisLockRegistry支持自动清理 redisLocks 的缓存RedisLockRegistry.locks通过RedisLockRegistry.setCacheCapacity().
有关更多信息,请参阅其 JavaDocs。
String 版本为 5.5.13,则RedisLockRegistry公开一个setRedisLockType(RedisLockType)选项来确定应该在哪种模式下进行 Redis 锁获取:
-
RedisLockType.SPIN_LOCK- 通过定期循环(100ms)检查是否可以获取锁来获取锁。 违约。 -
RedisLockType.PUB_SUB_LOCK- 锁由 redis pub-sub 订阅获取。
pub-sub 是首选模式 - 客户端 Redis 服务器之间的网络聊天更少,并且性能更高 - 当订阅在另一个进程中收到解锁通知时,会立即获取锁。 但是,Redis 不支持主/副本连接中的发布-订阅(例如在 AWS ElastiCache 环境中),因此选择忙碌旋转模式作为默认模式,以使注册表在任何环境中工作。