如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

Redis 支持

Spring Integration 2.1 引入了对 Redis("一个开源的高级键值存储")的支持。 该支持以基于 Redis 的 MessageStore 形式提供,以及通过 Redis 的 PUBLISH, SUBSCRIBEUNSUBSCRIBE 命令支持的发布 - 订阅消息适配器。spring-doc.cadn.net.cn

您需要将以下依赖项包含到您的项目中:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-redis</artifactId>
    <version>6.4.10</version>
</dependency>
compile "org.springframework.integration:spring-integration-redis:6.4.10"

您还需要包含 Redis 客户端依赖,例如 Lettuce。spring-doc.cadn.net.cn

要下载、安装和运行 Redis,请参阅 Redis 文档spring-doc.cadn.net.cn

连接至 Redis

要开始与 Redis 交互,您首先需要连接到它。 Spring Integration 利用另一个 Spring 项目提供的支持,即 Spring Data Redis,该项目提供了典型的 Spring 构造:ConnectionFactoryTemplate。 这些抽象简化了与多种 Redis 客户端 Java API 的集成。 目前,Spring Data Redis 支持 JedisLettucespring-doc.cadn.net.cn

使用RedisConnectionFactory

要连接到 Redis,您可以使用 RedisConnectionFactory 接口的任一实现。 以下代码片段展示了该接口的定义:spring-doc.cadn.net.cn

public interface RedisConnectionFactory extends PersistenceExceptionTranslator {

    /**
     * Provides a suitable connection for interacting with Redis.
     * @return connection for interacting with Redis.
     */
    RedisConnection getConnection();
}

以下示例展示了如何在 Java 中创建一个 LettuceConnectionFactoryspring-doc.cadn.net.cn

LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();

下面的示例展示了如何在 Spring 的 XML 配置中创建 LettuceConnectionFactoryspring-doc.cadn.net.cn

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379" />
</bean>

RedisConnectionFactory的实现提供了一组属性,例如端口和主机,您可以根据需要进行设置。 一旦您拥有RedisConnectionFactory的实例,就可以创建RedisTemplate的实例,并使用RedisConnectionFactory对其进行注入。spring-doc.cadn.net.cn

使用RedisTemplate

与 Spring 中的其他模板类(如 JdbcTemplateJmsTemplate)一样,RedisTemplate 是一个辅助类,用于简化 Redis 数据访问代码。 有关 RedisTemplate 及其变体(如 StringRedisTemplate)的更多信息,请参阅 Spring Data Redis 文档spring-doc.cadn.net.cn

下面的示例展示了如何在 Java 中创建 RedisTemplate 的实例:spring-doc.cadn.net.cn

RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);

以下示例展示了如何在 Spring 的 XML 配置中创建 RedisTemplate 的实例:spring-doc.cadn.net.cn

<bean id="redisTemplate"
         class="org.springframework.data.redis.core.RedisTemplate">
    <property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

使用 Redis 进行消息传递

介绍 中所提及,Redis 通过其 PUBLISHSUBSCRIBEUNSUBSCRIBE 命令支持发布 - 订阅消息传递。 与 JMS 和 AMQP 类似,Spring Integration 提供了用于通过 Redis 发送和接收消息的消息通道和适配器。spring-doc.cadn.net.cn

Redis 发布/订阅通道

与 JMS 类似,也存在生产者和消费者都属于同一应用程序并在同一进程中运行的情况。 您可以通过使用一对入站和出站通道适配器来实现这一点。 然而,正如 Spring Integration 的 JMS 支持一样,有一种更简单的方法来解决此用例。 您可以创建一个发布 - 订阅通道,如下例所示:spring-doc.cadn.net.cn

<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>

一个 publish-subscribe-channel 的行为与来自主 Spring Integration 命名空间的普通 <publish-subscribe-channel/> 元素非常相似。它可由任何端点的 input-channeloutput-channel 属性引用。区别在于,此通道由一个 Redis 主题名称支持:即由 topic-name 属性指定的 String 值。然而,与 JMS 不同,该主题无需预先创建,甚至无需由 Redis 自动创建。在 Redis 中,主题(topics)是简单的 String 值,其作用相当于地址。生产者和消费者可以使用相同的 String 值作为其主题名称进行通信。对频道的简单订阅意味着生产端点和消费端点之间可以实现异步发布 - 订阅消息传递。然而,与在简单的 Spring Integration <channel/> 元素内添加 <queue/> 元素创建的异步消息通道不同,这些消息不会存储在内存队列中。相反,这些消息通过 Redis 传递,这使得您可以依赖其持久化和集群支持,以及与其他非 Java 平台的互操作性。spring-doc.cadn.net.cn

Redis 入站通道适配器

Redis 入站通道适配器(RedisInboundChannelAdapter)以与其他入站适配器相同的方式将传入的 Redis 消息转换为 Spring 消息。 它接收特定于平台的消息(此处为 Redis),并通过使用 MessageConverter 策略将其转换为 Spring 消息。 以下示例展示了如何配置 Redis 入站通道适配器:spring-doc.cadn.net.cn

<int-redis:inbound-channel-adapter id="redisAdapter"
       topics="thing1, thing2"
       channel="receiveChannel"
       error-channel="testErrorChannel"
       message-converter="testConverter" />

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379" />
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />

上述示例展示了一个简单但完整的 Redis 入站通道适配器配置。 请注意,上述配置依赖于熟悉的 Spring 自动发现某些 bean 的范式。 在这种情况下,redisConnectionFactory 被隐式注入到适配器中。 您可以通过使用 connection-factory 属性显式地指定它。spring-doc.cadn.net.cn

此外,请注意上述配置使用自定义 MessageConverter 注入适配器。 该方法与 JMS 类似,其中使用 MessageConverter 实例在 Redis 消息和 Spring Integration 消息负载之间进行转换。 默认值为 SimpleMessageConverterspring-doc.cadn.net.cn

入站适配器可以订阅多个主题名称,因此 topics 属性中包含逗号分隔的值集合。spring-doc.cadn.net.cn

自版本 3.0 起,入站适配器除了现有的 topics 属性外,现在还具有 topic-patterns 属性。 该属性包含一个以逗号分隔的 Redis 主题模式集合。 有关 Redis 发布 - 订阅的更多信息,请参阅 Redis Pub/Subspring-doc.cadn.net.cn

入站适配器可以使用RedisSerializer来反序列化 Redis 消息的主体。 <int-redis:inbound-channel-adapter>serializer属性可设置为空字符串,这将导致RedisSerializer属性的值为null。 在这种情况下,Redis 消息的原始byte[]主体将作为消息负载提供。spring-doc.cadn.net.cn

从 5.0 版本开始,您可以通过 <int-redis:inbound-channel-adapter>task-executor 属性向入站适配器提供 Executor 实例。 此外,接收到的 Spring Integration 消息现在包含 RedisHeaders.MESSAGE_SOURCE 头信息,用于指示发布消息的来源:主题或模式。 您可以在下游逻辑中使用此信息进行路由。spring-doc.cadn.net.cn

Redis 出站通道适配器

Redis 出站通道适配器以与其他出站适配器相同的方式将 Spring Integration 的传出消息转换为 Redis 消息。 它接收 Spring Integration 消息,并通过使用 MessageConverter 策略将其转换为特定平台的消息(此处为 Redis)。 以下示例展示了如何配置 Redis 出站通道适配器:spring-doc.cadn.net.cn

<int-redis:outbound-channel-adapter id="outboundAdapter"
    channel="sendChannel"
    topic="thing1"
    message-converter="testConverter"/>

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379"/>
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />

该配置与 Redis 入站通道适配器类似。 该适配器隐式注入了一个 RedisConnectionFactory,其作为 Bean 的名称定义为 redisConnectionFactory。 此示例还包含可选(且自定义)的 MessageConverter(即 testConverter Bean)。spring-doc.cadn.net.cn

自 Spring Integration 3.0 起,<int-redis:outbound-channel-adapter> 提供了 topic 属性的替代方案:您可以使用 topic-expression 属性在运行时确定消息的 Redis 主题。 这些属性是互斥的。spring-doc.cadn.net.cn

Redis 队列入站通道适配器

Spring Integration 3.0 引入了一个队列入站通道适配器,用于从 Redis 列表中“弹出”消息。 默认情况下,它使用“右弹”,但您可以配置为使用“左弹”。 该适配器是消息驱动的。 它使用内部监听器线程,不使用轮询器。spring-doc.cadn.net.cn

以下代码列出了queue-inbound-channel-adapter的所有可用属性:spring-doc.cadn.net.cn

<int-redis:queue-inbound-channel-adapter id=""  (1)
                    channel=""  (2)
                    auto-startup=""  (3)
                    phase=""  (4)
                    connection-factory=""  (5)
                    queue=""  (6)
                    error-channel=""  (7)
                    serializer=""  (8)
                    receive-timeout=""  (9)
                    recovery-interval=""  (10)
                    expect-message=""  (11)
                    task-executor=""  (12)
                    right-pop=""/>  (13)
1 组件 Bean 的名称。 如果您未提供 channel 属性,将创建一个 DirectChannel 并在应用程序上下文中注册该 Bean,其 Bean 名称为此 id 属性的值。 在此情况下,端点本身将以 Bean 名称 id 加上 .adapter 进行注册。 (如果 Bean 名称为 thing1,则端点将注册为 thing1.adapter。)
2 将来自此端点的 Message 个实例发送到 MessageChannel
3 SmartLifecycle 属性用于指定该端点是否应在应用上下文启动后自动启动。 其默认值为 true
4 SmartLifecycle 属性用于指定此端点启动的阶段。 默认为 0
5 RedisConnectionFactory Bean 的引用。 它默认为 redisConnectionFactory
6 执行基于队列的 'pop' 操作以获取 Redis 消息的 Redis 列表名称。
7 MessageChannel 个实例发送到 ErrorMessage,当从端点的监听任务接收到异常时。 默认情况下,底层的 MessagePublishingErrorHandler 使用来自应用上下文的默认 errorChannel
8 引用 RedisSerializer bean。 它可以为空字符串,表示"无序列化器"。 在这种情况下,来自入站 Redis 消息的原始 byte[] 将作为 Message 有效负载发送到 channel。 默认情况下,它是一个 JdkSerializationRedisSerializer
9 'pop'操作等待从队列中获取Redis消息的超时时间(毫秒)。 默认值为1秒。
10 监听器任务在'pop'操作发生异常后,重启前需要休眠的毫秒数。
11 指定此端点是否期望来自 Redis 队列的数据包含完整的 Message 实例。 如果将此属性设置为 true,则 serializer 不能为空字符串,因为消息需要某种形式的反序列化(默认为 JDK 序列化)。 其默认值为 false
12 对 Spring TaskExecutor(或标准 JDK 1.5+ Executor)bean 的引用。 它用于底层监听任务。 默认值为 SimpleAsyncTaskExecutor
13 指定该端点是否使用“右弹出”(当true时)或“左弹出”(当false时)从 Redis 列表读取消息。 如果使用默认 Redis 队列出站通道适配器,当设置为true时,Redis 列表充当FIFO队列。 将其设置为false以配合向列表写入时使用“右推入”的软件,或实现类似栈的消息顺序。 其默认值为true。 自版本 4.3 起引入。
The task-executor has to be configured with more than one thread for processing; otherwise there is a possible deadlock when the RedisQueueMessageDrivenEndpoint tries to restart the listener task after an error. The errorChannel can be used to process those errors, to avoid restarts, but it is preferable to not expose your application to the possible deadlock situation. See Spring Framework Reference Manual for possible TaskExecutor implementations.

Redis 队列出站通道适配器

Spring Integration 3.0 引入了一个队列出站通道适配器,用于将 Spring Integration 消息“推送”到 Redis 列表。 默认情况下,它使用“左侧推送”,但您可以配置为使用“右侧推送”。 以下代码片段展示了 Redis queue-outbound-channel-adapter 的所有可用属性:spring-doc.cadn.net.cn

<int-redis:queue-outbound-channel-adapter id=""  (1)
                    channel=""  (2)
                    connection-factory=""  (3)
                    queue=""  (4)
                    queue-expression=""  (5)
                    serializer=""  (6)
                    extract-payload=""  (7)
                    left-push=""/>  (8)
1 组件 Bean 的名称。 如果您未提供 channel 属性,将创建一个 DirectChannel 并在应用程序上下文中注册该 Bean,其 Bean 名称为 id 属性的值。 在这种情况下,端点将以 id 加上 .adapter 作为 Bean 名称进行注册。 (如果 Bean 名称为 thing1,则端点将被注册为 thing1.adapter。)
2 从该端点接收 Message 个实例的 MessageChannel
3 RedisConnectionFactory Bean 的引用。 它默认为 redisConnectionFactory
4 执行基于队列的“推送”操作以发送 Redis 消息的 Redis 列表名称。 此属性与 queue-expression 互斥。
5 A SpEL Expression to determine the name of the Redis list. It uses the incoming Message at runtime as the #root variable. This attribute is mutually exclusive with queue.
6 一个 RedisSerializer bean 引用。 它默认为 JdkSerializationRedisSerializer。 然而,对于 String 有效载荷,如果未提供 serializer 引用,则使用 StringRedisSerializer
7 指定此端点是否仅将有效载荷或整个 Message 发送到 Redis 队列。 其默认值为 true
8 指定此端点是否应使用“左推”(当 true 时)或“右推”(当 false 时)将消息写入 Redis 列表。 如果使用默认 Redis 队列入站通道适配器,当设置为 true 时,Redis 列表表现为一个 FIFO 队列。 将其设置为 false 以配合从列表中执行“左弹”操作的软件,或实现类似栈的消息顺序。 其默认值为 true。 自版本 4.3 起提供。

Redis 应用程序事件

自 Spring Integration 3.0 起,Redis 模块提供了 IntegrationEvent 的实现,而该实现本身是 org.springframework.context.ApplicationEventRedisExceptionEvent 封装了来自 Redis 操作的异常(其中端点为事件的“来源”)。 例如,<int-redis:queue-inbound-channel-adapter/> 在捕获来自 BoundListOperations.rightPop 操作的异常后会发出这些事件。 该异常可以是任何通用的 org.springframework.data.redis.RedisSystemExceptionorg.springframework.data.redis.RedisConnectionFailureException。 使用 <int-event:inbound-channel-adapter/> 处理这些事件有助于识别后台 Redis 任务的问题并采取管理措施。spring-doc.cadn.net.cn

Redis 消息存储

正如《企业集成模式》(EIP) 书中所述,消息存储允许您持久化消息。 当关注可靠性时,这对于处理具有消息缓冲能力的组件(如聚合器、重排序器等)非常有用。 在 Spring Integration 中,MessageStore策略还为断言检查模式提供了基础,该模式也在 EIP 中进行了描述。spring-doc.cadn.net.cn

Spring Integration 的 Redis 模块提供了RedisMessageStore。 以下示例展示了如何将其与聚合器一起使用:spring-doc.cadn.net.cn

<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
    <constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="redisMessageStore"/>

上述示例是一个 bean 配置,它期望一个 RedisConnectionFactory 作为构造函数的参数。spring-doc.cadn.net.cn

默认情况下,RedisMessageStore 使用 Java 序列化来序列化消息。 然而,如果您想使用不同的序列化技术(例如 JSON),您可以通过设置 RedisMessageStorevalueSerializer 属性来提供您自己的序列化器。spring-doc.cadn.net.cn

从版本 4 开始。3.10,该框架为Message实例和MessageHeaders实例提供了Jackson序列化和反序列化实现——分别为MessageJacksonDeserializerMessageHeadersJacksonSerializer。它们必须为 ObjectMapper 配置 SimpleModule 个选项。此外,您应该在 ObjectMapper 上设置 enableDefaultTyping,以便为每个序列化的复杂对象添加类型信息(如果您信任该来源)。该类型信息随后在反序列化过程中被使用。该框架提供了一个名为 JacksonJsonUtils.messagingAwareMapper() 的工具方法,该方法已包含前述所有属性和序列化器。此实用方法带有 trustedPackages 个参数,用于限制反序列化时的 Java 包,以避免安全漏洞。默认受信任的包:java.util, java.lang, org.springframework.messaging.support, org.springframework.integration.support, org.springframework.integration.message, org.springframework.integration.store。要在 RedisMessageStore 中管理 JSON 序列化,您必须按照以下示例的方式进行配置:spring-doc.cadn.net.cn

RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);

从 4.3.12 版本开始,RedisMessageStore 支持 prefix 选项,以允许区分同一 Redis 服务器上的存储实例。spring-doc.cadn.net.cn

Redis 通道消息存储

The RedisMessageStore shown earlier maintains each group as a value under a single key (the group ID). While you can use this to back a QueueChannel for persistence, a specialized RedisChannelMessageStore is provided for that purpose (since version 4.0). This store uses a LIST for each channel, LPUSH when sending messages, and RPOP when receiving messages. By default, this store also uses JDK serialization, but you can modify the value serializer, as described earlier.spring-doc.cadn.net.cn

我们推荐使用此存储后端通道,而不是使用通用的 RedisMessageStore。 以下示例定义了一个 Redis 消息存储,并在带有队列的通道中使用它:spring-doc.cadn.net.cn

<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
	<constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="redisMessageStore"/>
<int:channel>

用于存储数据的键的格式为:<storeBeanName>:<channelId>(在前面的示例中为redisMessageStore:somePersistentQueueChannel)。spring-doc.cadn.net.cn

此外,还提供了子类 RedisChannelPriorityMessageStore。 当您将此与 QueueChannel 一起使用时,消息将按(先进先出)优先级顺序接收。 它使用标准的 IntegrationMessageHeaderAccessor.PRIORITY 标头,并支持优先级值(0 - 9)。 具有其他优先级的消息(以及没有优先级的消息)将在任何具有优先级的消息之后按 FIFO 顺序检索。spring-doc.cadn.net.cn

这些存储仅实现BasicMessageGroupStore,未实现MessageGroupStore。 它们仅可用于为QueueChannel提供备份支持等场景。

Redis 元数据存储

Spring Integration 3.0 引入了一种基于 Redis 的新的MetadataStore(参见元数据存储)实现。 您可以使用RedisMetadataStore来在应用程序重启后维护MetadataStore的状态。 您可以将此新的MetadataStore实现与适配器配合使用,例如:spring-doc.cadn.net.cn

要指示这些适配器使用新的 RedisMetadataStore,请声明一个名为 metadataStore 的 Spring Bean。 Feed 入站通道适配器和 feed 入站通道适配器都会自动获取并使用已声明的 RedisMetadataStore。 以下示例展示了如何声明此类 Bean:spring-doc.cadn.net.cn

<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
    <constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

The RedisMetadataStoreRedisProperties 提供支持。 与它的交互使用 BoundHashOperations,后者又需要为整个 Properties 存储设置一个 key。 对于 MetadataStore 的情况,该 key 起到区域的作用,这在分布式环境中非常有用,当多个应用程序使用同一个 Redis 服务器时。 默认情况下,该 key 的值为 MetaDataspring-doc.cadn.net.cn

从版本 4.0 开始,该存储实现了 ConcurrentMetadataStore,使其能够在多个应用实例之间可靠地共享,其中仅允许一个实例存储或修改键的值。spring-doc.cadn.net.cn

您无法在 Redis 集群中使用RedisMetadataStore.replace()(例如,在AbstractPersistentAcceptOnceFileListFilter中),因为目前不支持用于原子性的WATCH命令。

Redis Store 入站通道适配器

Redis 存储入站通道适配器是一个轮询消费者,它从 Redis 集合中读取数据并将其作为 Message 有效负载发送。 以下示例展示了如何配置 Redis 存储入站通道适配器:spring-doc.cadn.net.cn

<int-redis:store-inbound-channel-adapter id="listAdapter"
    connection-factory="redisConnectionFactory"
    key="myCollection"
    channel="redisChannel"
    collection-type="LIST" >
    <int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>

上述示例展示了如何使用 store-inbound-channel-adapter 元素配置 Redis 存储入站通道适配器,并提供各种属性的值,例如:spring-doc.cadn.net.cn

您不能同时设置 redis-templateconnection-factory

默认情况下,该适配器使用 StringRedisTemplate。 这为键、值、哈希键和哈希值使用 StringRedisSerializer 个实例。 如果您的 Redis 存储包含使用其他技术序列化的对象,则必须提供一个配置了适当序列化器的 RedisTemplate。 例如,如果存储是使用出站 Redis 适配器写入的,且该适配器的 extract-payload-elements 设置为 false,则您必须提供如下配置的 RedisTemplatespring-doc.cadn.net.cn

<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
    <property name="connectionFactory" ref="redisConnectionFactory"/>
    <property name="keySerializer">
        <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
    </property>
    <property name="hashKeySerializer">
        <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
    </property>
</bean>

The RedisTemplate uses String serializers for keys and hash keys and the default JDK Serialization serializers for values and hash values.spring-doc.cadn.net.cn

因为 key 具有字面量值,所以前面的示例相对简单且静态。 有时,您可能需要根据某些条件在运行时更改键的值。 为此,请使用 key-expression 代替,其中提供的表达式可以是任何有效的 SpEL 表达式。spring-doc.cadn.net.cn

此外,您可能希望对在从 Redis 集合中成功读取的数据进行一些后处理。 例如,您可能希望在数据被处理后将其移动或删除。 您可以通过使用 Spring Integration 2.2 中添加的事务同步功能来实现这一点。 以下示例使用了 key-expression 和事务同步:spring-doc.cadn.net.cn

<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
        connection-factory="redisConnectionFactory"
        key-expression="'presidents'"
        channel="otherRedisChannel"
        auto-startup="false"
        collection-type="ZSET">
            <int:poller fixed-rate="1000" max-messages-per-poll="2">
                <int:transactional synchronization-factory="syncFactory"/>
            </int:poller>
</int-redis:store-inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
	<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

您可以通过使用 transactional 元素将您的轮询器声明为事务性的。 该元素可以引用一个真实的事务管理器(例如,如果流程的其他部分调用了 JDBC)。 如果您没有“真实”的事务,可以使用 o.s.i.transaction.PseudoTransactionManager,它是 Spring 的 PlatformTransactionManager 的一个实现,并在没有实际事务时启用 Redis 适配器的事务同步功能。spring-doc.cadn.net.cn

这并不会使 Redis 活动本身具有事务性。 它允许在成功(提交)之前或之后,或在失败(回滚)之后执行操作同步。

一旦您的轮询器变为事务性的,您就可以在transactional元素上设置o.s.i.transaction.TransactionSynchronizationFactory的实例。TransactionSynchronizationFactory 创建 TransactionSynchronization 的实例。为了您的便利,我们提供了一个默认的基于 SpEL 的TransactionSynchronizationFactory,它允许您配置 SpEL 表达式,并且其执行与事务协调(同步)。支持 before-commit、after-commit 和 after-rollback 表达式,以及对应的通道(每种事件类型一个),用于发送评估结果(如有)。对于每个子元素,您可以指定 expressionchannel 属性。如果仅存在channel属性,则接收到的消息将作为特定同步场景的一部分发送到该处。如果仅存在 expression 属性且表达式的结果为非空值,则会生成一条以该结果为有效负载的消息,并将其发送到默认通道 (NullChannel),同时出现在日志中(级别为 DEBUG)。如果您希望评估结果发送到特定通道,请添加 channel 属性。如果表达式的结果为 null 或 void,则不会生成任何消息。spring-doc.cadn.net.cn

The RedisStoreMessageSource adds a store attribute with a RedisStore instance bound to the transaction IntegrationResourceHolder, which can be accessed from a TransactionSynchronizationProcessor implementation.spring-doc.cadn.net.cn

有关事务同步的更多信息,请参见 事务同步spring-doc.cadn.net.cn

RedisStore 出站通道适配器

RedisStore 出站通道适配器允许您将消息负载写入 Redis 集合,如下例所示:spring-doc.cadn.net.cn

<int-redis:store-outbound-channel-adapter id="redisListAdapter"
          collection-type="LIST"
          channel="requestChannel"
          key="myCollection" />

上述配置通过使用 store-inbound-channel-adapter 元素为 Redis 存储出站通道适配器进行设置。 它为各种属性提供值,例如:spring-doc.cadn.net.cn

  • key or key-expression: 正在使用的集合的键的名称。spring-doc.cadn.net.cn

  • extract-payload-elements: 如果设置为 true(默认值)且负载是“多值”对象(即 CollectionMap)的实例,则使用“addAll”和“putAll”语义进行存储。 否则,如果设置为 false,则无论其类型如何,负载都作为单个条目进行存储。 如果负载不是“多值”对象的实例,则此属性的值将被忽略,并且负载始终作为单个条目进行存储。spring-doc.cadn.net.cn

  • collection-type: 此适配器支持的 Collection 类型的枚举。 支持的集合包括 LISTSETZSETPROPERTIESMAPspring-doc.cadn.net.cn

  • map-key-expression: 返回待存储条目键名的 SpEL 表达式。 仅当 collection-typeMAPPROPERTIES 且 'extract-payload-elements' 为 false 时生效。spring-doc.cadn.net.cn

  • connection-factory: 对 o.s.data.redis.connection.RedisConnectionFactory 实例的引用。spring-doc.cadn.net.cn

  • redis-template: 对 o.s.data.redis.core.RedisTemplate 实例的引用。spring-doc.cadn.net.cn

  • 其他所有入站适配器(如'channel')共有的属性。spring-doc.cadn.net.cn

您不能同时设置 redis-templateconnection-factory
默认情况下,适配器使用 StringRedisTemplate。 这为键、值、哈希键和哈希值使用 StringRedisSerializer 个实例。 然而,如果将 extract-payload-elements 设置为 false,则将使用一个具有 StringRedisSerializer 个键和哈希键实例以及 JdkSerializationRedisSerializer 个值和哈希值实例的 RedisTemplate。 使用 JDK 序列化器时,需要理解的是:Java 序列化用于所有值,无论该值是否实际上是一个集合。 如果您需要对值的序列化有更多的控制,请考虑提供您自己的 RedisTemplate,而不是依赖这些默认值。

由于它具有key和其他属性的字面量值,前面的示例相对简单且静态。 有时,您可能需要根据某些条件在运行时动态更改这些值。 为此,请使用它们的-expression等效项(key-expressionmap-key-expression等),其中提供的表达式可以是任何有效的SpEL表达式。spring-doc.cadn.net.cn

Redis 出站命令网关

Spring Integration 4.0 引入了 Redis 命令网关,允许您通过通用的 RedisConnection#execute 方法执行任何标准的 Redis 命令。 以下列表显示了 Redis 出站网关的可用属性:spring-doc.cadn.net.cn

<int-redis:outbound-gateway
        request-channel=""  (1)
        reply-channel=""  (2)
        requires-reply=""  (3)
        reply-timeout=""  (4)
        connection-factory=""  (5)
        redis-template=""  (6)
        arguments-serializer=""  (7)
        command-expression=""  (8)
        argument-expressions=""  (9)
        use-command-variable=""  (10)
        arguments-strategy="" /> (11)
1 从该端点接收 Message 个实例的 MessageChannel
2 The MessageChannel where this endpoint sends reply Message instances.
3 指定此出站网关是否必须返回非空值。 它默认为 true。 当 Redis 返回 null 值时,将抛出 ReplyRequiredException
4 等待回复消息发送的超时时间(以毫秒为单位)。 通常应用于基于队列的受限回复通道。
5 RedisConnectionFactory bean 的引用。 默认值为 redisConnectionFactory。 它与 'redis-template' 属性互斥。
6 RedisTemplate bean 的引用。 它与 'connection-factory' 属性互斥。
7 org.springframework.data.redis.serializer.RedisSerializer 实例的引用。 在必要时,它用于将每个命令参数序列化为 byte[]。
8 返回命令键的 SpEL 表达式。 它默认为 redis_command 消息头。 它不能求值为 null
9 以逗号分隔的 SpEL 表达式,将作为命令参数进行求值。 与 arguments-strategy 属性互斥。 如果您未提供任何属性,则使用 payload 作为命令参数。 参数表达式可以求值为 'null',以支持可变数量的参数。
10 一个 boolean 标志,用于指定当配置了 argument-expressions 时,评估后的 Redis 命令字符串是否作为 #cmd 变量在 o.s.i.redis.outbound.ExpressionArgumentsStrategy 的表达式求值上下文中可用。 否则,此属性将被忽略。
11 o.s.i.redis.outbound.ArgumentsStrategy 实例的引用。 它与 argument-expressions 属性互斥。 如果您未提供任一属性,则使用 payload 作为命令参数。

您可以使用 <int-redis:outbound-gateway> 作为通用组件来执行任何所需的 Redis 操作。 下面的示例展示了如何从 Redis 原子计数器获取递增的值:spring-doc.cadn.net.cn

<int-redis:outbound-gateway request-channel="requestChannel"
    reply-channel="replyChannel"
    command-expression="'INCR'"/>

Message 负载的名称应为 redisCounter,该名称可由 org.springframework.data.redis.support.atomic.RedisAtomicInteger bean 定义提供。spring-doc.cadn.net.cn

The RedisConnection#execute 方法以其泛型 Object 作为返回类型。 实际结果取决于命令类型。 例如,MGET 返回一个 List<byte[]>。 有关命令、其参数和结果类型的更多信息,请参见 Redis 规范spring-doc.cadn.net.cn

Redis 队列出站网关

Spring Integration 引入了 Redis 队列出站网关,以执行请求和回复场景。 它将一个对话 UUID 推送到提供的 queue,将带有该 UUID 作为键的值推送到 Redis 列表,并等待来自键为 UUID 加上 .reply 的 Redis 列表的回复。 每次交互使用不同的 UUID。 以下列表显示了 Redis 出站网关的可用属性:spring-doc.cadn.net.cn

<int-redis:queue-outbound-gateway
        request-channel=""  (1)
        reply-channel=""  (2)
        requires-reply=""  (3)
        reply-timeout=""  (4)
        connection-factory=""  (5)
        queue=""  (6)
        order=""  (7)
        serializer=""  (8)
        extract-payload=""/>  (9)
1 从该端点接收 Message 个实例的 MessageChannel
2 The MessageChannel where this endpoint sends reply Message instances.
3 指定此出站网关是否必须返回非空值。 默认情况下,该值为 false。 否则,当 Redis 返回 null 值时,将抛出 ReplyRequiredException
4 等待回复消息发送的超时时间(以毫秒为单位)。 通常应用于基于队列的受限回复通道。
5 RedisConnectionFactory bean 的引用。 默认值为 redisConnectionFactory。 它与 'redis-template' 属性互斥。
6 发送到出站网关的 Redis 列表的名称 UUID
7 当注册多个网关时,此出站网关的优先级顺序。
8 引用 RedisSerializer bean。 它可以为空字符串,表示“无序列化器”。 在这种情况下,来自入站 Redis 消息的原始 byte[] 将作为 Message 负载发送到 channel。 默认情况下,它是一个 JdkSerializationRedisSerializer
9 指定此端点是否期望来自 Redis 队列的数据包含完整的 Message 实例。 如果将此属性设置为 true,则 serializer 不能为空字符串,因为消息需要某种形式的反序列化(默认为 JDK 序列化)。

Redis 队列入站网关

Spring Integration 4.1 引入了 Redis 队列入站网关,用于执行请求和回复场景。 它从提供的 queue 中弹出一个对话 UUID,从 Redis 列表中弹出以该 UUID 为键的值,并将回复推送到 Redis 列表,其键为 UUID 加上 .reply。 以下代码清单展示了 Redis 队列入站网关的可用属性:spring-doc.cadn.net.cn

<int-redis:queue-inbound-gateway
        request-channel=""  (1)
        reply-channel=""  (2)
        executor=""  (3)
        reply-timeout=""  (4)
        connection-factory=""  (5)
        queue=""  (6)
        order=""  (7)
        serializer=""  (8)
        receive-timeout=""  (9)
        expect-message=""  (10)
        recovery-interval=""/>  (11)
1 The MessageChannel where this endpoint sends Message instances created from the Redis data.
2 从该端点等待回复的 MessageChannelMessage 个实例。 可选 - replyChannel 标头仍在使用中。
3 对 Spring TaskExecutor(或标准 JDK Executor)Bean 的引用。 它用于底层监听任务。 默认值为 SimpleAsyncTaskExecutor
4 等待回复消息发送的超时时间(以毫秒为单位)。 通常应用于基于队列的受限回复通道。
5 RedisConnectionFactory bean 的引用。 默认值为 redisConnectionFactory。 它与 'redis-template' 属性互斥。
6 用于对话的 Redis 列表的名称 UUID
7 当注册多个网关时,此入站网关的优先级顺序。
8 RedisSerializer Bean 的引用。 它可以是空字符串,表示“无序列化器”。 在这种情况下,来自入站 Redis 消息的原始 byte[] 将作为 Message 有效载荷发送给 channel。 默认情况下为 JdkSerializationRedisSerializer。 (注意:在 4.3 版本之前的发行版中,默认值为 StringRedisSerializer。 若要恢复该行为,请提供一个对 StringRedisSerializer 的引用)。
9 等待获取接收消息的超时时间(以毫秒为单位)。 它通常应用于基于队列的受限请求通道。
10 指定此端点是否期望来自 Redis 队列的数据包含完整的 Message 实例。 如果将此属性设置为 true,则 serializer 不能为空字符串,因为消息需要某种形式的反序列化(默认为 JDK 序列化)。
11 监听器任务在“右弹出”操作发生异常后,重启前应休眠的时间(以毫秒为单位)。
The task-executor has to be configured with more than one thread for processing; otherwise there is a possible deadlock when the RedisQueueMessageDrivenEndpoint tries to restart the listener task after an error. The errorChannel can be used to process those errors, to avoid restarts, but it is preferable to not expose your application to the possible deadlock situation. See Spring Framework Reference Manual for possible TaskExecutor implementations.

Redis Stream 出站通道适配器

Spring Integration 5.4 引入了响应式 Redis Stream 出站通道适配器,用于将消息负载写入 Redis 流。 出站通道适配器使用 ReactiveStreamOperations.add(…​) 向流中添加 Record。 以下示例展示了如何使用 Java 配置和 Service 类来配置 Redis Stream 出站通道适配器。spring-doc.cadn.net.cn

@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
        ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
    ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
        new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1)
    reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2)
    reactiveStreamMessageHandler.setHashMapper(hashMapper); (3)
    reactiveStreamMessageHandler.setExtractPayload(true); (4)
    return reactiveStreamMessageHandler;
}
1 使用 ReactiveRedisStreamMessageHandler 和流名称创建 ReactiveRedisConnectionFactory 的实例以添加记录。 另一个构造函数变体基于 SpEL 表达式,用于针对请求消息评估流键。
2 设置 RedisSerializationContext 用于在将记录键和值添加到流之前对其进行序列化。
3 设置 HashMapper,该设置提供 Java 类型与 Redis 哈希/映射之间的契约。
4 如果为 'true',通道适配器将从请求消息中提取负载以添加流记录。 或者将整个消息作为值使用。 其默认值为 true

Redis Stream 入站通道适配器

Spring Integration 5.4 引入了响应式流入站通道适配器,用于从 Redis Stream 读取消息。 入站通道适配器根据自动确认标志使用 StreamReceiver.receive(…​)StreamReceiver.receiveAutoAck() 来从 Redis Stream 读取记录。 以下示例展示了如何使用 Java 配置 Redis Stream 入站通道适配器。spring-doc.cadn.net.cn

@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
       ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
            new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1)
    messageProducer.setStreamReceiverOptions( (2)
                StreamReceiver.StreamReceiverOptions.builder()
                      .pollTimeout(Duration.ofMillis(100))
                      .build());
    messageProducer.setAutoStartup(true); (3)
    messageProducer.setAutoAck(false); (4)
    messageProducer.setCreateConsumerGroup(true); (5)
    messageProducer.setConsumerGroup("my-group"); (6)
    messageProducer.setConsumerName("my-consumer"); (7)
    messageProducer.setOutputChannel(fromRedisStreamChannel); (8)
    messageProducer.setReadOffset(ReadOffset.latest()); (9)
    messageProducer.extractPayload(true); (10)
    return messageProducer;
}
1 使用 ReactiveRedisConnectionFactory 和流键创建 ReactiveRedisStreamMessageProducer 的实例以读取记录。
2 一个使用响应式基础设施消费 Redis 流的 StreamReceiver.StreamReceiverOptions
3 一个 SmartLifecycle 属性,用于指定该端点在应用上下文启动后是否自动启动。 默认值为 true。 若设置为 false,则需手动启动 RedisStreamMessageProducermessageProducer.start()
4 如果为 false,则接收的消息不会自动确认。 消息的确认将延迟到消费该消息的客户端进行。 默认值为 true
5 如果为 true,将创建一个消费者组。 在创建消费者组时,也会创建消费流(如果尚未存在)。 消费者组负责跟踪消息传递并区分不同的消费者。 默认值为 false
6 设置消费者组名称。 它默认为定义的 Bean 名称。
7 设置消费者名称。 从组 my-group 读取消息为 my-consumer
8 用于从此端点发送消息的消息通道。
9 定义读取消息的偏移量。 默认值为 ReadOffset.latest()
10 如果为 Record,则通道适配器将从 Record 中提取负载值。 否则,整个 Record 将作为负载使用。 其默认值为 true

如果将 autoAck 设置为 false,则 Redis Stream 中的 Record 不会由 Redis 驱动程序自动确认,而是会在消息中添加一个 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 头,以便与作为值的 SimpleAcknowledgment 实例一起生产。 当基于该记录完成消息的业务逻辑时,目标集成流有责任调用其 acknowledge() 回调。 即使在反序列化过程中发生异常且配置了 errorChannel 的情况下,也需要类似的逻辑。 因此,目标错误处理器必须决定对该失败消息进行确认(ack)或否定确认(nack)。 除了 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 之外,ReactiveRedisStreamMessageProducer 还会将这些头填充到要生产的消息中:RedisHeaders.STREAM_KEYRedisHeaders.STREAM_MESSAGE_IDRedisHeaders.CONSUMER_GROUPRedisHeaders.CONSUMERspring-doc.cadn.net.cn

从 5.5 版本开始,您可以在 StreamReceiver.StreamReceiverOptionsBuilder 上显式配置 ReactiveRedisStreamMessageProducer 个选项,包括新引入的 onErrorResume 函数。如果 Redis Stream 消费者在发生反序列化错误时应继续轮询,则必须使用该函数。 默认函数会将消息发送到错误通道(如果提供),并如上所述对失败的消息进行可能的确认。 所有这些 StreamReceiver.StreamReceiverOptionsBuilder 与外部提供的 StreamReceiver.StreamReceiverOptions 互斥。spring-doc.cadn.net.cn

Redis 锁注册表

Spring Integration 4.0 引入了 RedisLockRegistry。 某些组件(例如聚合器和重排序器)使用从 LockRegistry 实例获取的锁,以确保同一时间只有一个线程操作一个组。 DefaultLockRegistry 在单个组件内执行此功能。 现在您可以为这些组件配置外部锁注册表。 当您将它与共享的 MessageGroupStore 配合使用时,可以使用 RedisLockRegistry 在多个应用实例之间提供此功能,从而确保同一时间只有一个实例能操作该组。spring-doc.cadn.net.cn

当一个锁被本地线程释放后,另一个本地线程通常可以立即获取该锁。 如果一个锁是由使用不同注册表实例的线程释放的,则获取该锁可能需要长达100毫秒的时间。spring-doc.cadn.net.cn

为了避免“挂起”的锁(当服务器失败时),此注册表中的锁在默认 60 秒后过期,但您可以在注册表中配置该值。 锁通常持有的时间要短得多。spring-doc.cadn.net.cn

由于键可能会过期,尝试解锁过期的锁会导致抛出异常。 然而,此类锁所保护的资源可能已受到损害,因此此类异常应被视为严重问题。 您应将过期时间设置为足够大的值以防止这种情况发生,但也要将其设置得足够低,以便在服务器故障后能在合理的时间内恢复锁。

从 5.0 版本开始,RedisLockRegistry 实现了 ExpirableLockRegistry,该功能会移除超过 age 时间前获取且当前未锁定的锁。spring-doc.cadn.net.cn

从版本 5.5.6 开始,RedisLockRegistry 支持通过 RedisLockRegistry.setCacheCapacity() 自动清理 RedisLockRegistry.locks 中 RedisLocks 的缓存。 请参阅其 JavaDocs 以获取更多信息。spring-doc.cadn.net.cn

从版本 5.5.13 开始,RedisLockRegistry 暴露了一个 setRedisLockType(RedisLockType) 选项,用于确定 Redis 锁获取应在哪种模式下进行:spring-doc.cadn.net.cn

  • RedisLockType.SPIN_LOCK - 锁通过周期性循环(100ms)检查是否可获取而获得。 默认值。spring-doc.cadn.net.cn

  • RedisLockType.PUB_SUB_LOCK - 锁已通过 Redis 发布/订阅订阅获得。spring-doc.cadn.net.cn

发布 - 订阅模式是首选方式——客户端与 Redis 服务器之间的网络通信更少,性能更高;当其他进程通过订阅通知解锁时,锁会立即被获取。 然而,Redis 在 Master/Replica 连接中不支持发布 - 订阅(例如在 AWS ElastiCache 环境中),因此默认选择忙等待模式,以确保注册表在任何环境下都能正常工作。spring-doc.cadn.net.cn

从 6.4 版本开始,当锁的所有权过期时,RedisLockRegistry.RedisLock.unlock() 方法不再抛出 IllegalStateException,而是抛出 ConcurrentModificationExceptionspring-doc.cadn.net.cn

从 6.4 版本开始,会添加一个 RedisLockRegistry.setRenewalTaskScheduler() 参数来配置用于定期续期锁的调度器。 当设置该参数后,在成功获取锁之后,锁将在过期时间的每 1/3 时刻自动续期,直到锁被释放或 Redis 键被移除。spring-doc.cadn.net.cn

AWS ElastiCache for Valkey 在集群模式下的支持

从版本 6.4.9/6.5.4/7.0.0 开始,RedisLockRegistry 支持 AWS Elasticache for Valkey 的集群模式。 在此版本的 Valkey(Redis 的即插即用替代品)中,所有 PubSub 操作(PUBLISHSUBSCRIBE 等)在内部都使用其分片变体(SPUBLISHSSUBSCRIBE 等)。 如果您观察到如下形式的错误:spring-doc.cadn.net.cn

Caused by: io.lettuce.core.RedisCommandExecutionException: ERR Script attempted to access keys that do not hash to the same slot script: b2dedc0ab01c17f9f20e3e6ddb62dcb6afbed0bd, on @user_script:3.

unlock 步骤的 RedisLockRegistry 中,您必须提供一个包含哈希标签 {…​} 的锁键,以确保 unlock 脚本中的所有操作都哈希到相同的集群槽/分片,例如:spring-doc.cadn.net.cn

RedisLockRegistry lockRegistry = new RedisLockRegistry("my-lock-key{choose_your_tag}");

lockRegistry.lock();
# critical section
lockRegistry.unlock();