|
此版本仍在开发中,尚未被视为稳定版。如需最新稳定版本,请使用 Spring Integration 7.0.4! |
Redis 支持
Spring Integration 2.1 引入了对 Redis("一个开源的高级键值存储")的支持。
该支持以基于 Redis 的 MessageStore 形式提供,以及通过 Redis 的 PUBLISH, SUBSCRIBE 和 UNSUBSCRIBE 命令支持的发布 - 订阅消息适配器。
此依赖项是项目所必需的:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>7.1.0-M3</version>
</dependency>
implementation "org.springframework.integration:spring-integration-redis:7.1.0-M3"
必须包含 Redis 客户端依赖,例如 Lettuce。
要下载、安装和运行 Redis,请参阅 Redis 文档。
连接至 Redis
要开始与 Redis 交互,首先必须获取连接。
Spring Integration 利用另一个 Spring 项目提供的支持,即 Spring Data Redis,它提供了典型的 Spring 构建块:ConnectionFactory 和 Template。
这些抽象简化了与多种 Redis 客户端 Java API 的集成。
目前,Spring Data Redis 支持 Jedis 和 Lettuce。
使用RedisConnectionFactory
The RedisConnectionFactory from Spring Data Redis is a high-level abstraction for managing connections with Redis.
The following listing shows the interface definition:
public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
/**
* Provides a suitable connection for interacting with Redis.
* @return connection for interacting with Redis.
*/
RedisConnection getConnection();
}
以下示例展示了如何在 Java 中创建一个 LettuceConnectionFactory:
LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();
下面的示例展示了如何在 Spring 的 XML 配置中创建 LettuceConnectionFactory:
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
RedisConnectionFactory的实现提供了一组属性,例如端口和主机。
一旦存在RedisConnectionFactory的实例,就可以创建RedisTemplate。
使用RedisTemplate
与 Spring 中的其他模板类(如 JdbcTemplate 和 JmsTemplate)一样,RedisTemplate 是一个辅助类,用于简化 Redis 数据访问代码。
有关 RedisTemplate 及其变体(如 StringRedisTemplate)的更多信息,请参阅 Spring Data Redis 文档。
下面的示例展示了如何在 Java 中创建 RedisTemplate 的实例:
RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);
以下示例展示了如何在 Spring 的 XML 配置中创建 RedisTemplate 的实例:
<bean id="redisTemplate"
class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
使用 Redis 进行消息传递
如 介绍 中所提及,Redis 通过其 PUBLISH、SUBSCRIBE 和 UNSUBSCRIBE 命令支持发布 - 订阅消息传递。
与 JMS 和 AMQP 类似,Spring Integration 提供了用于通过 Redis 发送和接收消息的消息通道和适配器。
Redis 发布/订阅通道
与 JMS 类似,也存在生产者和消费者都属于同一应用程序、并在同一进程中运行的场景。 这可以通过一对入站和出站通道适配器来实现。 然而,正如 Spring Integration 的 JMS 支持一样,有更简单的方法来解决此用例。 相反,可以使用发布 - 订阅通道,如下例所示:
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>
一个 publish-subscribe-channel 的行为与来自主 Spring Integration 命名空间的普通 <publish-subscribe-channel/> 元素非常相似。它可由任何端点的 input-channel 和 output-channel 属性引用。区别在于,此通道由一个 Redis 主题名称支持:即由 topic-name 属性指定的 String 值。然而,与 JMS 不同,该主题无需预先创建,甚至无需由 Redis 自动创建。在 Redis 中,主题(topics)是简单的 String 值,其作用相当于地址。生产者和消费者可以使用相同的 String 值作为其主题名称进行通信。对频道的简单订阅意味着生产端点和消费端点之间可以实现异步发布 - 订阅消息传递。然而,与在简单的 Spring Integration <channel/> 元素内添加 <queue/> 元素创建的异步消息通道不同,这些消息不会存储在内存队列中。相反,这些消息会通过 Redis 传递,这使我们能够依赖其持久化和集群支持,以及与其他非 Java 平台的互操作性。
Redis 入站通道适配器
Redis 入站通道适配器(RedisInboundChannelAdapter)以与其他入站适配器相同的方式将传入的 Redis 消息转换为 Spring 消息。
它接收特定于平台的消息(此处为 Redis),并通过使用 MessageConverter 策略将其转换为 Spring 消息。
以下示例展示了如何配置 Redis 入站通道适配器:
<int-redis:inbound-channel-adapter id="redisAdapter"
topics="thing1, thing2"
channel="receiveChannel"
error-channel="testErrorChannel"
message-converter="testConverter" />
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
前面的示例展示了一个简单但完整的 Redis 入站通道适配器配置。
请注意,上述配置依赖于 Spring 熟悉的自动发现某些 Bean 的范式。
在本例中,redisConnectionFactory 被隐式注入到适配器中。
或者,可以通过 connection-factory 属性注入自定义的 RedisConnectionFactory。
此外,请注意上述配置使用自定义 MessageConverter 注入适配器。
该方法与 JMS 类似,其中使用 MessageConverter 实例在 Redis 消息和 Spring Integration 消息负载之间进行转换。
默认值为 SimpleMessageConverter。
入站适配器可以订阅多个主题名称,因此 topics 属性中包含逗号分隔的值集合。
自版本 3.0 起,入站适配器除了现有的 topics 属性外,现在还具有 topic-patterns 属性。
该属性包含一个以逗号分隔的 Redis 主题模式集合。
有关 Redis 发布 - 订阅的更多信息,请参阅 Redis Pub/Sub。
入站适配器可以使用RedisSerializer来反序列化 Redis 消息的主体。
<int-redis:inbound-channel-adapter>的serializer属性可设置为空字符串,这将导致RedisSerializer属性的值为null。
在这种情况下,Redis 消息的原始byte[]主体将作为消息负载提供。
自 5.0 版本起,可以通过 <int-redis:inbound-channel-adapter> 的 task-executor 属性将 Executor 实例注入到入站适配器中。
此外,接收到的 Spring Integration 消息现在包含 RedisHeaders.MESSAGE_SOURCE 头信息,用于指示发布消息的来源:主题或模式。
这可用于下游的路由逻辑。
Redis 出站通道适配器
Redis 出站通道适配器以与其他出站适配器相同的方式将 Spring Integration 的传出消息转换为 Redis 消息。
它接收 Spring Integration 消息,并通过使用 MessageConverter 策略将其转换为特定平台的消息(此处为 Redis)。
以下示例展示了如何配置 Redis 出站通道适配器:
<int-redis:outbound-channel-adapter id="outboundAdapter"
channel="sendChannel"
topic="thing1"
message-converter="testConverter"/>
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379"/>
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
该配置与 Redis 入站通道适配器类似。
该适配器隐式注入了一个 RedisConnectionFactory,其作为 Bean 的名称定义为 redisConnectionFactory。
此示例还包含可选(且自定义)的 MessageConverter(即 testConverter Bean)。
自 Spring Integration 3.0 起,<int-redis:outbound-channel-adapter>提供了topic属性的替代方案:topic-expression属性用于在运行时确定消息的 Redis 主题。
这些属性是互斥的。
Redis 队列入站通道适配器
Spring Integration 3.0 引入了一个队列入站通道适配器,用于从 Redis 列表中“弹出”消息。 默认情况下,它使用“右弹出”,但可配置为改用“左弹出”。 该适配器是消息驱动的。 它使用内部监听器线程,不使用轮询器。
以下代码列出了queue-inbound-channel-adapter的所有可用属性:
<int-redis:queue-inbound-channel-adapter id="" (1)
channel="" (2)
auto-startup="" (3)
phase="" (4)
connection-factory="" (5)
queue="" (6)
error-channel="" (7)
serializer="" (8)
receive-timeout="" (9)
recovery-interval="" (10)
expect-message="" (11)
task-executor="" (12)
right-pop=""/> (13)
| 1 | 组件 Bean 的名称。
如果未提供 channel 属性,则创建并注册一个 DirectChannel 到应用上下文中,使用该 id 属性作为 Bean 名称。
在此情况下,端点本身以 id 加上 .adapter 的 Bean 名称进行注册。
(如果 Bean 名称为 thing1,则端点注册为 thing1.adapter。) |
| 2 | 将来自此端点的 Message 个实例发送到 MessageChannel。 |
| 3 | SmartLifecycle 属性用于指定该端点是否应在应用程序上下文启动后自动启动。
其默认值为 true。 |
| 4 | SmartLifecycle 属性用于指定此端点启动的阶段。
默认为 0。 |
| 5 | 对 RedisConnectionFactory Bean 的引用。
它默认为 redisConnectionFactory。 |
| 6 | 执行基于队列的 'pop' 操作以获取 Redis 消息的 Redis 列表名称。 |
| 7 | 将 MessageChannel 个实例发送到 ErrorMessage,当从端点的监听任务接收到异常时。
默认情况下,底层的 MessagePublishingErrorHandler 使用来自应用上下文的默认 errorChannel。 |
| 8 | 引用 RedisSerializer bean。
它可以为空字符串,表示"无序列化器"。
在这种情况下,来自入站 Redis 消息的原始 byte[] 将作为 Message 有效负载发送到 channel。
默认情况下,它是一个 JdkSerializationRedisSerializer。 |
| 9 | 'pop'操作等待从队列中获取Redis消息的超时时间(毫秒)。 默认值为1秒。 |
| 10 | 监听器任务在'pop'操作发生异常后,重启前需要休眠的毫秒数。 |
| 11 | 指定此端点是否期望来自 Redis 队列的数据包含完整的 Message 实例。
如果将此属性设置为 true,则 serializer 不能为空字符串,因为消息需要某种形式的反序列化(默认为 JDK 序列化)。
其默认值为 false。 |
| 12 | 对 Spring TaskExecutor(或标准 JDK 1.5+ Executor)bean 的引用。
它用于底层监听任务。
默认值为 SimpleAsyncTaskExecutor。 |
| 13 | 指定该端点是否使用“右弹出”(当true时)或“左弹出”(当false时)从 Redis 列表读取消息。
如果使用默认 Redis 队列出站通道适配器,当设置为true时,Redis 列表充当FIFO队列。
将其设置为false以配合向列表写入时使用“右推入”的软件,或实现类似栈的消息顺序。
其默认值为true。
自版本 4.3 起引入。 |
The task-executor has to be configured with more than one thread for processing; otherwise there is a possible deadlock when the RedisQueueMessageDrivenEndpoint tries to restart the listener task after an error.
The errorChannel can be used to process those errors, to avoid restarts, but it is preferable to not expose the application to the possible deadlock situation.
See Spring Framework Reference Manual for possible TaskExecutor implementations. |
Redis 队列出站通道适配器
Spring Integration 3.0 引入了队列出站通道适配器,用于将 Spring Integration 消息“推送”到 Redis 列表。
默认情况下,它使用“左推”,但可以配置为“右推”。
以下代码示例展示了 Redis queue-outbound-channel-adapter 的所有可用属性:
<int-redis:queue-outbound-channel-adapter id="" (1)
channel="" (2)
connection-factory="" (3)
queue="" (4)
queue-expression="" (5)
serializer="" (6)
extract-payload="" (7)
left-push=""/> (8)
| 1 | 组件 bean 的名称。
如果未提供 channel 属性,将创建一个 DirectChannel 并在应用程序上下文中注册该 id 属性作为 bean 名称。
在此情况下,端点将以 id 加上 .adapter 作为 bean 名称进行注册。
(如果 bean 名称为 thing1,则端点将注册为 thing1.adapter。) |
| 2 | 从该端点接收 Message 个实例的 MessageChannel。 |
| 3 | 对 RedisConnectionFactory Bean 的引用。
它默认为 redisConnectionFactory。 |
| 4 | 执行基于队列的“推送”操作以发送 Redis 消息的 Redis 列表名称。
此属性与 queue-expression 互斥。 |
| 5 | A SpEL Expression to determine the name of the Redis list.
It uses the incoming Message at runtime as the #root variable.
This attribute is mutually exclusive with queue. |
| 6 | 一个 RedisSerializer bean 引用。
它默认为 JdkSerializationRedisSerializer。
然而,对于 String 有效载荷,如果未提供 serializer 引用,则使用 StringRedisSerializer。 |
| 7 | 指定此端点是否仅将有效载荷或整个 Message 发送到 Redis 队列。
其默认值为 true。 |
| 8 | 指定此端点是否应使用“左推”(当 true 时)或“右推”(当 false 时)将消息写入 Redis 列表。
如果使用默认 Redis 队列入站通道适配器,当设置为 true 时,Redis 列表表现为一个 FIFO 队列。
将其设置为 false 以配合从列表中执行“左弹”操作的软件,或实现类似栈的消息顺序。
其默认值为 true。
自版本 4.3 起提供。 |
Redis 应用程序事件
自 Spring Integration 3.0 起,Redis 模块提供了 IntegrationEvent 的实现,而该实现本身是一个 org.springframework.context.ApplicationEvent。
RedisExceptionEvent 封装了来自 Redis 操作的异常(其中端点为事件的“源”)。
例如,<int-redis:queue-inbound-channel-adapter/> 在捕获来自 BoundListOperations.rightPop 操作的异常后会发出这些事件。
该异常可以是任何通用的 org.springframework.data.redis.RedisSystemException 或 org.springframework.data.redis.RedisConnectionFailureException。
使用 <int-event:inbound-channel-adapter/> 处理这些事件,有助于确定后台 Redis 任务的问题并采取管理措施。
Redis 消息存储
正如《企业集成模式》(EIP)书中所述,消息存储允许持久化消息。
当涉及具有消息缓冲能力(如聚合器、重排序器等)的组件且可靠性是关键考量时,这非常有用。
在 Spring Integration 中,MessageStore策略还为断言检查模式提供了基础,该模式同样在 EIP 中进行了描述。
Spring Integration 的 Redis 模块提供了RedisMessageStore。
以下示例展示了如何将其与聚合器一起使用:
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="redisMessageStore"/>
上述示例是一个 bean 配置,它期望一个 RedisConnectionFactory 作为构造函数的参数。
默认情况下,RedisMessageStore 使用 Java 序列化来序列化消息。
然而,如果需要不同的序列化技术(例如 JSON),可以将自定义序列化器设置到 RedisMessageStore 的 valueSerializer 属性中。
该框架为Message实例和MessageHeaders实例提供了Jackson序列化和反序列化实现——分别为MessageJsonDeserializer和MessageHeadersJsonSerializer。
它们必须使用SimpleModule选项进行配置,以用于ObjectMapper。
此外,为了在每次序列化复杂对象时添加类型信息,应在ObjectMapper上设置enableDefaultTyping。
该type信息随后将在反序列化过程中被使用。
该框架提供了一个名为JacksonMessagingUtils.messagingAwareMapper()的工具方法,该方法已包含上述所有属性和序列化器。
此工具方法附带trustedPackages参数,用于限制Java包以进行反序列化,从而避免安全漏洞。
默认受信任的包:java.util、java.lang、org.springframework.messaging.support、org.springframework.integration.support、org.springframework.integration.message、org.springframework.integration.store。
若要管理RedisMessageStore中的JSON序列化,必须应用如下配置:
RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonMessagingUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson3JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);
从 4.3.12 版本开始,RedisMessageStore 支持 prefix 选项,以允许区分同一 Redis 服务器上的存储实例。
Redis 通道消息存储
The RedisMessageStore shown earlier maintains each group as a value under a single key (the group ID).
While a QueueChannel can be used for persistence, a specialized RedisChannelMessageStore is provided for that purpose (since version 4.0).
This store uses a LIST for each channel, LPUSH when sending messages, and RPOP when receiving messages.
By default, this store also uses JDK serialization, but it can be modified for the value serializer, as described earlier.
建议使用存储后端通道,而不是使用通用的 RedisMessageStore。
以下示例定义了一个 Redis 消息存储,并在带有队列的通道中使用它:
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="redisMessageStore"/>
<int:channel>
用于存储数据的键的格式为:<storeBeanName>:<channelId>(在前面的示例中为redisMessageStore:somePersistentQueueChannel)。
此外,还提供了一个子类 RedisChannelPriorityMessageStore。
当它与 QueueChannel 配合使用时,消息会按照(先进先出)优先级顺序接收。
它使用标准的 IntegrationMessageHeaderAccessor.PRIORITY 头字段,并支持优先级值(0 - 9)。
具有其他优先级(以及没有优先级的)消息会在所有具有优先级的消息之后,按先进先出顺序检索。
这些存储仅实现BasicMessageGroupStore,未实现MessageGroupStore。
它们仅可用于为QueueChannel提供备份支持等场景。 |
Redis 元数据存储
Spring Integration 3.0 引入了一个基于 Redis 的 MetadataStore(见 元数据存储)实现。
RedisMetadataStore 可用于在应用程序重启之间维护 MetadataStore 的状态。
此类 MetadataStore 实现可与以下适配器一起使用:
要指示这些适配器使用新的 RedisMetadataStore,请声明一个名为 metadataStore 的 Spring Bean。
Feed 入站通道适配器和 feed 入站通道适配器都会自动获取并使用已声明的 RedisMetadataStore。
以下示例展示了如何声明此类 Bean:
<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
<constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
The RedisMetadataStore 由 RedisProperties 提供支持。
与它的交互使用 BoundHashOperations,后者又需要为整个 Properties 存储设置一个 key。
对于 MetadataStore 的情况,该 key 起到区域的作用,这在分布式环境中非常有用,当多个应用程序使用同一个 Redis 服务器时。
默认情况下,该 key 的值为 MetaData。
从版本 4.0 开始,该存储实现了 ConcurrentMetadataStore,使其能够在多个应用实例之间可靠地共享,其中仅允许一个实例存储或修改键的值。
RedisMetadataStore.replace() 不能用于(例如,在 AbstractPersistentAcceptOnceFileListFilter 中)与 Redis 集群一起使用,因为目前不支持用于原子性的 WATCH 命令。 |
Redis Store 入站通道适配器
Redis 存储入站通道适配器是一个轮询消费者,它从 Redis 集合中读取数据并将其作为 Message 有效负载发送。
以下示例展示了如何配置 Redis 存储入站通道适配器:
<int-redis:store-inbound-channel-adapter id="listAdapter"
connection-factory="redisConnectionFactory"
key="myCollection"
channel="redisChannel"
collection-type="LIST" >
<int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>
上述示例展示了如何使用 store-inbound-channel-adapter 元素配置 Redis 存储入站通道适配器,并提供各种属性的值,例如:
-
keyorkey-expression: 正在使用的集合的键的名称。 -
collection-type: 此适配器支持的集合类型的枚举。 支持的集合包括LIST,SET,ZSET,PROPERTIES和MAP。 -
connection-factory: 对o.s.data.redis.connection.RedisConnectionFactory实例的引用。 -
redis-template: 对o.s.data.redis.core.RedisTemplate实例的引用。 -
其他所有入站适配器通用的属性(如“channel”)。
redis-template 和 connection-factory 是互斥的。 |
|
默认情况下,适配器使用
The |
由于它具有 key 的字面值,前面的示例相对简单且静态。
有时,必须根据某些条件在运行时更改键的值。
为此,请使用 key-expression 代替,其中提供的表达式可以是任何有效的 SpEL 表达式。
此外,还可以对从 Redis 集合中读取并已成功处理的数据进行一些后处理。
例如,在处理完成后,该值可以被移动或删除。
此类逻辑可以使用事务同步功能来实现。
以下示例使用了 key-expression 和事务同步:
<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
connection-factory="redisConnectionFactory"
key-expression="'presidents'"
channel="otherRedisChannel"
auto-startup="false"
collection-type="ZSET">
<int:poller fixed-rate="1000" max-messages-per-poll="2">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-redis:store-inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
轮询器可以通过使用 transactional 元素使其具有事务性。
该元素可以引用一个真实的事务管理器,例如,如果流程的其他部分调用了 JDBC。
如果没有“真实”的事务,可以使用 o.s.i.transaction.PseudoTransactionManager 代替,它是 Spring PlatformTransactionManager 的一个实现,能够在没有实际事务时启用 Redis 适配器的事务同步功能。
| 这并不会使 Redis 活动本身具有事务性。 它允许在成功(提交)之前或之后,或在失败(回滚)之后执行操作同步。 |
一旦轮询器具有事务性,就可以在transactional元素上添加一个o.s.i.transaction.TransactionSynchronizationFactory的实例。TransactionSynchronizationFactory 创建 TransactionSynchronization 的实例。为了方便,暴露了一个默认的基于 SpEL 的TransactionSynchronizationFactory,它允许配置 SpEL 表达式,其执行与事务协调(同步)。支持提交前、提交后和回滚后的表达式,以及对应的通道(每种事件一个),用于发送评估结果(如果有)。对于每个子元素,可以指定 expression 和 channel 属性。如果仅存在channel属性,则接收到的消息将作为特定同步场景的一部分发送到该处。如果仅存在 expression 属性且表达式的结果为非空值,则会生成一条以该结果为有效负载的消息,并将其发送到默认通道 (NullChannel),同时出现在日志中(级别为 DEBUG)。如果表达式的结果为 null 或 void,则不会生成任何消息。
The RedisStoreMessageSource adds a store attribute with a RedisStore instance bound to the transaction IntegrationResourceHolder, which can be accessed from a TransactionSynchronizationProcessor implementation.
有关事务同步的更多信息,请参见 事务同步。
RedisStore 出站通道适配器
RedisStore 出站通道适配器允许将消息负载写入 Redis 集合,如下例所示:
<int-redis:store-outbound-channel-adapter id="redisListAdapter"
collection-type="LIST"
channel="requestChannel"
key="myCollection" />
上述配置通过使用 store-inbound-channel-adapter 元素为 Redis 存储出站通道适配器进行设置。
它为各种属性提供值,例如:
-
keyorkey-expression: 正在使用的集合的键的名称。 -
extract-payload-elements: 如果设置为true(默认值)且负载是“多值”对象(即Collection或Map)的实例,则使用“addAll”和“putAll”语义进行存储。 否则,如果设置为false,则无论其类型如何,负载都作为单个条目进行存储。 如果负载不是“多值”对象的实例,则此属性的值将被忽略,并且负载始终作为单个条目进行存储。 -
collection-type: 此适配器支持的Collection类型的枚举。 支持的集合包括LIST、SET、ZSET、PROPERTIES和MAP。 -
map-key-expression: 返回待存储条目键名的 SpEL 表达式。 仅当collection-type为MAP或PROPERTIES且 'extract-payload-elements' 为 false 时生效。 -
connection-factory: 对o.s.data.redis.connection.RedisConnectionFactory实例的引用。 -
redis-template: 对o.s.data.redis.core.RedisTemplate实例的引用。 -
其他所有入站适配器通用的属性(如“channel”)。
redis-template 和 connection-factory 是互斥的。 |
默认情况下,适配器使用 StringRedisTemplate。
这为键、值、哈希键和哈希值使用 StringRedisSerializer 个实例。
然而,如果将 extract-payload-elements 设置为 false,则会使用一个 RedisTemplate,其中键和哈希键有 StringRedisSerializer 个实例,值和哈希值有 JdkSerializationRedisSerializer 个实例。
在使用 JDK 序列化器时,重要的是要理解:Java 序列化用于所有值,无论该值实际上是否为集合。
如果需要更精细地控制值的序列化,可以提供一个自定义的 RedisTemplate,而不是依赖这些默认设置。 |
由于它具有key和其他属性的字面量值,前面的示例相对简单且静态。
有时,这些值可能会根据某些条件在运行时动态更改。
为此,提供了它们的-expression等效项(key-expression、map-key-expression等),其中表达式可以是任何有效的SpEL表达式。
Redis 出站命令网关
Spring Integration 4.0 引入了 Redis 命令网关,允许通过通用的 RedisConnection#execute 方法执行任何标准的 Redis 命令。
以下代码片段展示了 Redis 出站网关的可用属性:
<int-redis:outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
redis-template="" (6)
arguments-serializer="" (7)
command-expression="" (8)
argument-expressions="" (9)
use-command-variable="" (10)
arguments-strategy="" /> (11)
| 1 | 从该端点接收 Message 个实例的 MessageChannel。 |
| 2 | The MessageChannel where this endpoint sends reply Message instances. |
| 3 | 指定此出站网关是否必须返回非空值。
它默认为 true。
当 Redis 返回 null 值时,将抛出 ReplyRequiredException。 |
| 4 | 等待回复消息发送的超时时间(以毫秒为单位)。 通常应用于基于队列的受限回复通道。 |
| 5 | 对 RedisConnectionFactory Bean 的引用。
默认值为 redisConnectionFactory。
它与 redis-template 属性互斥。 |
| 6 | 对 RedisTemplate bean 的引用。
它与 connection-factory 属性互斥。 |
| 7 | 对 org.springframework.data.redis.serializer.RedisSerializer 实例的引用。
如果必要,它用于将每个命令参数序列化为 byte[]。 |
| 8 | 返回命令键的 SpEL 表达式。
它默认为 redis_command 消息头。
它不能求值为 null。 |
| 9 | 以逗号分隔的 SpEL 表达式,将作为命令参数进行求值。
与 arguments-strategy 属性互斥。
如果未提供任一属性,则将使用 payload 作为命令参数。
参数表达式可以求值为 'null',以支持可变数量的参数。 |
| 10 | 一个 boolean 标志,用于指定当配置了 argument-expressions 时,评估后的 Redis 命令字符串是否作为 #cmd 变量在 o.s.i.redis.outbound.ExpressionArgumentsStrategy 的表达式求值上下文中可用。
否则,此属性将被忽略。 |
| 11 | 对 o.s.i.redis.outbound.ArgumentsStrategy 实例的引用。
它与 argument-expressions 属性互斥。
如果未提供任一属性,则将使用 payload 作为命令参数。 |
<int-redis:outbound-gateway> 可用作通用组件来执行任何所需的 Redis 操作。
以下示例展示了如何从 Redis 原子数中获取递增值:
<int-redis:outbound-gateway request-channel="requestChannel"
reply-channel="replyChannel"
command-expression="'INCR'"/>
Message 负载的名称应为 redisCounter,该名称可由 org.springframework.data.redis.support.atomic.RedisAtomicInteger bean 定义提供。
The RedisConnection#execute 方法的返回类型是一个泛型 Object。
实际结果取决于命令类型。
例如,MGET 返回一个 List<byte[]>。
有关命令、其参数和结果类型的更多信息,请参阅 Redis 规范。
Redis 队列出站网关
Spring Integration 引入了 Redis 队列出站网关,以执行请求和回复场景。
它将一个对话 UUID 推送到提供的 queue,将带有该 UUID 作为键的值推送到 Redis 列表,并等待来自键为 UUID 加上 .reply 的 Redis 列表的回复。
每次交互使用不同的 UUID。
以下列表显示了 Redis 出站网关的可用属性:
<int-redis:queue-outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
extract-payload=""/> (9)
| 1 | 从该端点接收 Message 个实例的 MessageChannel。 |
| 2 | The MessageChannel where this endpoint sends reply Message instances. |
| 3 | 指定此出站网关是否必须返回非空值。
默认情况下,该值为 false。
否则,当 Redis 返回 null 值时,将抛出 ReplyRequiredException。 |
| 4 | 等待回复消息发送的超时时间(以毫秒为单位)。 通常应用于基于队列的受限回复通道。 |
| 5 | 对 RedisConnectionFactory bean 的引用。
默认值为 redisConnectionFactory。
它与 'redis-template' 属性互斥。 |
| 6 | 发送到出站网关的 Redis 列表的名称 UUID。 |
| 7 | 当注册多个网关时,此出站网关的优先级顺序。 |
| 8 | 引用 RedisSerializer bean。
它可以为空字符串,表示“无序列化器”。
在这种情况下,来自入站 Redis 消息的原始 byte[] 将作为 Message 负载发送到 channel。
默认情况下,它是一个 JdkSerializationRedisSerializer。 |
| 9 | 指定此端点是否期望来自 Redis 队列的数据包含完整的 Message 实例。
如果将此属性设置为 true,则 serializer 不能为空字符串,因为消息需要某种形式的反序列化(默认为 JDK 序列化)。 |
Redis 队列入站网关
Spring Integration 4.1 引入了 Redis 队列入站网关,用于执行请求和回复场景。
它从提供的 queue 中弹出一个对话 UUID,从 Redis 列表中弹出以该 UUID 为键的值,并将回复推送到 Redis 列表,其键为 UUID 加上 .reply。
以下代码清单展示了 Redis 队列入站网关的可用属性:
<int-redis:queue-inbound-gateway
request-channel="" (1)
reply-channel="" (2)
executor="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
receive-timeout="" (9)
expect-message="" (10)
recovery-interval=""/> (11)
| 1 | The MessageChannel where this endpoint sends Message instances created from the Redis data. |
| 2 | 从该端点等待回复的 MessageChannel 为 Message 个实例。
可选 - replyChannel 标头仍在使用中。 |
| 3 | 对 Spring TaskExecutor(或标准 JDK Executor)Bean 的引用。
它用于底层监听任务。
默认值为 SimpleAsyncTaskExecutor。 |
| 4 | 等待回复消息发送的超时时间(以毫秒为单位)。 通常应用于基于队列的受限回复通道。 |
| 5 | 对 RedisConnectionFactory Bean 的引用。
默认值为 redisConnectionFactory。
它与 redis-template 属性互斥。 |
| 6 | 用于对话的 Redis 列表的名称 UUID。 |
| 7 | 当注册多个网关时,此入站网关的优先级顺序。 |
| 8 | 对 RedisSerializer Bean 的引用。
它可以是空字符串,表示“无序列化器”。
在这种情况下,来自入站 Redis 消息的原始 byte[] 将作为 Message 有效载荷发送给 channel。
默认情况下为 JdkSerializationRedisSerializer。
(注意:在 4.3 版本之前的发行版中,默认值为 StringRedisSerializer。
若要恢复该行为,请提供一个对 StringRedisSerializer 的引用)。 |
| 9 | 等待接收到的消息被获取的超时时间(以毫秒为单位)。 它通常应用于基于队列的受限请求通道。 |
| 10 | 指定此端点是否期望来自 Redis 队列的数据包含完整的 Message 实例。
如果将此属性设置为 true,则 serializer 不能为空字符串,因为消息需要某种形式的反序列化(默认为 JDK 序列化)。 |
| 11 | 监听器任务在“右弹出”操作发生异常后,重启前应休眠的时间(以毫秒为单位)。 |
The task-executor has to be configured with more than one thread for processing; otherwise there is a possible deadlock when the RedisQueueMessageDrivenEndpoint tries to restart the listener task after an error.
The errorChannel can be used to process those errors, to avoid restarts, but it is preferable to not expose the application to the possible deadlock situation.
See Spring Framework Reference Manual for possible TaskExecutor implementations. |
Redis Stream 出站通道适配器
Spring Integration 5.4 引入了响应式 Redis Stream 出站通道适配器,用于将消息负载写入 Redis 流。
出站通道适配器使用 ReactiveStreamOperations.add(…) 向流中添加 Record。
以下示例展示了如何使用 Java 配置和 Service 类来配置 Redis Stream 出站通道适配器。
@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1)
reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2)
reactiveStreamMessageHandler.setHashMapper(hashMapper); (3)
reactiveStreamMessageHandler.setExtractPayload(true); (4)
return reactiveStreamMessageHandler;
}
| 1 | 使用 ReactiveRedisStreamMessageHandler 和流名称创建 ReactiveRedisConnectionFactory 的实例以添加记录。
另一个构造函数变体基于 SpEL 表达式,用于针对请求消息评估流键。 |
| 2 | 设置 RedisSerializationContext 用于在将记录键和值添加到流之前对其进行序列化。 |
| 3 | 设置 HashMapper,该设置提供 Java 类型与 Redis 哈希/映射之间的契约。 |
| 4 | 如果为 'true',通道适配器将从流记录添加的请求消息中提取负载。
或者将整个消息作为值使用。
其默认值为 true。 |
从版本 6.5 开始,ReactiveRedisStreamMessageHandler 提供了一个setAddOptionsFunction(Function<Message<?>, RedisStreamCommands.XAddOptions> addOptionsFunction),用于基于请求消息为内部ReactiveStreamOperations.add(Record<K, ?> record, XAddOptions xAddOptions)调用构建RedisStreamCommands.XAddOptions。
Redis Stream 入站通道适配器
Spring Integration 5.4 引入了用于从 Redis Stream 读取消息的响应式流入站通道适配器。
入站通道适配器根据自动确认标志使用 StreamReceiver.receive(…) 或 StreamReceiver.receiveAutoAck() 来从 Redis Stream 读取记录。
以下示例展示了如何使用 Java 配置来设置 Redis Stream 入站通道适配器。
@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1)
messageProducer.setStreamReceiverOptions( (2)
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.build());
messageProducer.setAutoStartup(true); (3)
messageProducer.setAutoAck(false); (4)
messageProducer.setCreateConsumerGroup(true); (5)
messageProducer.setConsumerGroup("my-group"); (6)
messageProducer.setConsumerName("my-consumer"); (7)
messageProducer.setOutputChannel(fromRedisStreamChannel); (8)
messageProducer.setReadOffset(ReadOffset.latest()); (9)
messageProducer.extractPayload(true); (10)
return messageProducer;
}
| 1 | 使用 ReactiveRedisConnectionFactory 和流键创建 ReactiveRedisStreamMessageProducer 的实例以读取记录。 |
| 2 | 一个使用响应式基础设施消费 Redis 流的 StreamReceiver.StreamReceiverOptions。 |
| 3 | SmartLifecycle 属性用于指定该端点是否应在应用程序上下文启动后自动启动。
其默认值为 true。
若设置为 false,则 RedisStreamMessageProducer 应通过手动方式启动 messageProducer.start()。 |
| 4 | 如果为false,则接收的消息不会自动确认。
消息的确认将延迟到客户端消费消息时进行。
默认值为true。 |
| 5 | 如果为true,将创建一个消费者组。
在创建消费者组时,如果尚未存在,也会创建对应的流(stream)。
消费者组用于跟踪消息传递并区分不同的消费者。
默认值为false。 |
| 6 | 设置消费者组名称。 它默认为定义的 Bean 名称。 |
| 7 | 设置消费者名称。
从组 my-group 读取一条消息,其值为 my-consumer。 |
| 8 | 用于从此端点发送消息的消息通道。 |
| 9 | 定义读取消息的偏移量。
它默认为 ReadOffset.latest()。 |
| 10 | 如果为 Record,则通道适配器将从 Record 中提取负载值。
否则,整个 Record 将作为负载使用。
其默认值为 true。 |
如果将 autoAck 设置为 false,则 Redis Stream 中的 Record 不会由 Redis 驱动程序自动确认,而是会在消息中添加一个 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 头信息,以便与以 SimpleAcknowledgment 实例为值的消息一起生产。
当基于此类记录完成消息的业务逻辑时,目标集成流有责任调用其 acknowledge() 回调。
即使反序列化过程中发生异常且配置了 errorChannel,也需要类似的逻辑。
因此,目标错误处理器必须决定是确认(ack)还是否定确认(nack)该失败的消息。
除了 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 之外,ReactiveRedisStreamMessageProducer 还会将这些头信息填充到要生产的消息中:RedisHeaders.STREAM_KEY、RedisHeaders.STREAM_MESSAGE_ID、RedisHeaders.CONSUMER_GROUP 和 RedisHeaders.CONSUMER。
从 5.5 版本开始,可以在 ReactiveRedisStreamMessageProducer 上显式配置 StreamReceiver.StreamReceiverOptionsBuilder 个选项,包括新引入的 onErrorResume 函数。如果 Redis Stream 消费者在发生反序列化错误时仍需继续轮询,则必须使用该函数。
默认函数会向错误通道(如果已提供)发送一条消息,并如上所述对失败的消息进行可能的确认。
所有这些 StreamReceiver.StreamReceiverOptionsBuilder 都与外部提供的 StreamReceiver.StreamReceiverOptions 互斥。
Redis 锁注册表
Spring Integration 4.0 引入了 RedisLockRegistry。
某些组件(例如聚合器和重排序器)使用从 LockRegistry 实例获取的锁,以确保同一时间只有一个线程操作一个组。
DefaultLockRegistry 在单个组件内执行此功能。
可以在这些组件上配置外部锁注册表。
当它与共享的 MessageGroupStore 一起使用时,可以设置 RedisLockRegistry 以在多个应用程序实例之间提供此功能,从而确保同一时间只有一个实例能够操作该组。
当一个锁被本地线程释放后,另一个本地线程通常可以立即获取该锁。 如果一个锁是由使用不同注册表实例的线程释放的,则获取该锁可能需要长达100毫秒的时间。
为避免“挂起”锁(当服务器故障时),此注册表中的锁在默认60秒后过期,但可在注册表中配置。 锁通常被持有的时间要短得多。
| 由于键可能会过期,尝试解锁一个已过期的锁会抛出异常。 然而,此类锁所保护的资源可能已受到损害,因此这类异常应被视为严重问题。 过期时间应设置为足够大以防止此情况发生,但同时又要设置得足够低,以便在服务器故障后能在合理时间内恢复锁。 |
从 5.0 版本开始,RedisLockRegistry 实现了 ExpirableLockRegistry,该功能会移除超过 age 时间前获取且当前未锁定的锁。
从版本 5.5.6 开始,RedisLockRegistry 支持通过 RedisLockRegistry.setCacheCapacity() 自动清理 RedisLockRegistry.locks 中 RedisLocks 的缓存。
请参阅其 JavaDocs 以获取更多信息。
从版本 5.5.13 开始,RedisLockRegistry 暴露了一个 setRedisLockType(RedisLockType) 选项,用于确定 Redis 锁获取应在哪种模式下进行:
-
RedisLockType.SPIN_LOCK- 锁通过周期性循环(100ms)检查是否可获取而获得。 默认值。 -
RedisLockType.PUB_SUB_LOCK- 锁已通过 Redis 发布/订阅订阅获得。
发布 - 订阅模式是首选模式——客户端与 Redis 服务器之间的网络通信更少,性能更高——当其他进程中的订阅者收到解锁通知时,锁会立即获取。 然而,Redis 在 Master/Replica(主从)连接中不支持发布 - 订阅功能(例如在 AWS ElastiCache 环境中),因此选择忙等待模式作为默认方式,以确保注册表在任何环境下都能正常工作。
从 6.4 版本开始,当锁的所有权过期时,RedisLockRegistry.RedisLock.unlock() 方法不再抛出 IllegalStateException,而是抛出 ConcurrentModificationException。
从 6.4 版本开始,会添加一个 RedisLockRegistry.setRenewalTaskScheduler() 参数来配置用于定期续期锁的调度器。
当设置该参数后,在成功获取锁之后,锁将在过期时间的每 1/3 时刻自动续期,直到锁被释放或 Redis 键被移除。
从 7.0 版本开始,RedisLock 实现了 DistributedLock 接口,以支持锁状态数据的自定义过期时间(TTL)功能。
现在可以使用 lock(Duration ttl) 或 tryLock(long time, TimeUnit unit, Duration ttl) 方法获取一个带有指定过期时间(TTL)值的 RedisLock。
RedisLockRegistry 现在提供了新的 renewLock(Object lockKey, Duration ttl) 方法,允许使用自定义的过期时间值续期锁。
AWS ElastiCache for Valkey 在集群模式下的支持
从版本 6.4.9/6.5.4/7.0.0 开始,RedisLockRegistry 支持 AWS Elasticache for Valkey 的集群模式。
在此版本的 Valkey(Redis 的即插即用替代品)中,所有 PubSub 操作(PUBLISH、SUBSCRIBE 等)在内部都使用其分片变体(SPUBLISH、SSUBSCRIBE 等)。
如果出现以下形式的错误:
Caused by: io.lettuce.core.RedisCommandExecutionException: ERR Script attempted to access keys that do not hash to the same slot script: b2dedc0ab01c17f9f20e3e6ddb62dcb6afbed0bd, on @user_script:3.
"在unlock步骤的RedisLockRegistry中,必须提供一个包含井号{…}的锁键,以确保unlock脚本中的所有操作都被哈希到相同的集群槽/分片,例如:
RedisLockRegistry lockRegistry = new RedisLockRegistry("my-lock-key{choose_your_tag}");
lockRegistry.lock();
# critical section
lockRegistry.unlock();