|
对于最新稳定版本,请使用 Spring Integration 7.0.0! |
配置消息通道
要创建消息通道实例,你可以使用<频道/>元素用于XML或直达频道Java配置实例,具体如下:
-
Java
-
XML
@Bean
public MessageChannel exampleChannel() {
return new DirectChannel();
}
<int:channel id="exampleChannel"/>
当你使用<频道/>没有任何子元素,它生成一个直达频道实例(a订阅频道).
要创建发布-订阅频道,请使用<发布-订阅-频道/>元素(该发布订阅频道在爪哇语中),具体如下:
-
Java
-
XML
@Bean
public MessageChannel exampleChannel() {
return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>
你也可以选择多种<队列/>子元素用于创建任意可轮询的信道类型(如消息信道实现中描述)。
以下章节展示了每种信道类型的示例。
直达频道配置
如前所述,直达频道是默认类型。
以下列表展示了谁应该定义该职位:
-
Java
-
XML
@Bean
public MessageChannel directChannel() {
return new DirectChannel();
}
<int:channel id="directChannel"/>
默认信道具有轮询负载均衡器,并且启用故障切换(参见直达频道更多细节)。
要禁用其中一个或两个,请添加一个<调度员/>子元素(a负载均衡策略构造者直达频道)并配置属性如下:
-
Java
-
XML
@Bean
public MessageChannel failFastChannel() {
DirectChannel channel = new DirectChannel();
channel.setFailover(false);
return channel;
}
@Bean
public MessageChannel failFastChannel() {
return new DirectChannel(null);
}
<int:channel id="failFastChannel">
<int:dispatcher failover="false"/>
</channel>
<int:channel id="channelWithFixedOrderSequenceFailover">
<int:dispatcher load-balancer="none"/>
</int:channel>
数据类型通道配置
有时,消费者只能处理特定类型的负载,迫使你确保输入消息的负载类型。 我首先想到的可能是使用消息过滤器。 然而,该消息过滤器只能过滤掉不符合消费者需求的消息。 另一种方法是使用基于内容的路由器,将不合规数据类型的消息路由到特定的变换器,以强制转换和转换到所需的数据类型。 这确实可行,但更简单的方法是应用数据类型通道模式。 你可以为每个特定的有效载荷数据类型使用不同的数据类型通道。
要创建只接受包含某一有效载荷类型的消息的数据类型通道,请在通道元素中提供该数据类型的完全限定类名称数据类型属性,如下例所示:
-
Java
-
XML
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(Number.class);
return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>
注意,类型检查能通过任何可分配给通道数据类型的类型。
换句话说,数字频道在前述示例中,将接受有效载荷为java.lang.整数或java.lang.Double.
多种类型可以作为逗号分隔列表提供,如下示例所示:
-
Java
-
XML
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(String.class, Number.class);
return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>
因此,前述示例中的“numberChannel”只接受数据类型为java.lang.Number.
但如果消息的有效载荷不是所需类型,会发生什么?
这取决于你是否定义了一个名为integrationConversionService这是Spring的转换服务的一个实例。
如果不行,则例外会立即被抛弃。
然而,如果你定义了integrationConversionServiceBEAN,它用于尝试将消息的有效载荷转换为可接受的类型。
你甚至可以注册自定义转换器。
例如,假设你发送一条包含字符串有效载荷传输到我们上面配置的“numberChannel”。
你可以这样处理这条信息:
MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));
通常,这会是完全合法的作。 然而,由于我们使用Datatype Channel,这种作的结果会产生类似以下异常:
Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…
例外发生是因为我们要求有效载荷类型为数但我们发了字符串.
所以我们需要一个东西来转换字符串转给数.
为此,我们可以实现类似以下示例的转换器:
public static class StringToIntegerConverter implements Converter<String, Integer> {
public Integer convert(String source) {
return Integer.parseInt(source);
}
}
然后我们可以将其注册为积分转换服务的转换器,如下示例所示:
-
Java
-
XML
@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>
<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>
或者在字符串到整数转换器当它被标记为@Component自动扫描注释。
当解析“转换器”元素时,它生成了integrationConversionService如果还没有定义的话,那就是豆子。
有了那个转换器,那个发送作将成功,因为数据类型通道利用该转换器进行转换字符串有效载荷到整数.
有关有效载荷类型转换的更多信息,请参见有效载荷类型转换。
从4.0版本开始,integrationConversionService由默认数据类型通道消息转换器,它在应用上下文中查找转换服务。
要使用不同的转换技术,你可以指定消息转换器在频道上有属性。
这一定是对消息转换器实现。
只有发件消息采用方法。
它为转换器提供了对消息头部的访问(以防转换过程中可能需要从头部获取信息,例如内容类型).
该方法只能返回已转换的有效载荷或完整的有效载荷消息对象。
如果是后者,转换器必须小心复制所有入站消息的头部。
或者,你可以声明<豆/>类型消息转换器其标识为数据类型通道消息转换器该转换器被所有具有数据类型.
队列通道配置
要创建一个队列通道,使用<队列/>子元素。
您可以指定频道容量如下:
-
Java
-
XML
@Bean
public PollableChannel queueChannel() {
return new QueueChannel(25);
}
<int:channel id="queueChannel">
<queue capacity="25"/>
</int:channel>
如果你没有为“容量”属性提供值,那就要说明<队列/>子元素,生成的队列是无界的。
为避免内存不足等问题,我们强烈建议你为有界队列设置一个显式值。 |
持续队列通道配置
自队列通道它提供了缓存消息的能力,但默认仅在内存中进行,同时引入了在系统故障时消息可能丢失的可能性。
为了降低这种风险,一个队列通道可能由持久实现支持,MessageGroupStore策略界面。
欲了解更多详情MessageGroupStore和消息商店,请参见消息商店。
这能力当消息存储属性被使用。 |
当队列通道获得消息,它将消息添加到消息存储中。
当消息是从一个队列通道,它会从消息存储中移除。
默认情况下,队列通道将消息存储在内存队列中,这可能导致前述的消息丢失情景。
然而,Spring 集成提供了持久存储,例如Jdbc频道消息存储.
你可以为任何配置消息存储队列通道通过添加消息存储属性,如下例所示:
<int:channel id="dbBackedChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
(见下方示例,了解 Java/Kotlin 配置选项。)
Spring Integration JDBC 模块还为多种流行数据库提供了模式数据定义语言(DDL)。
这些模式位于该模块的 org.springframework.integration.jdbc.store.channel 包中(Spring-integration-JDBC).
一个重要特性是,对于任何事务型持久存储(例如Jdbc频道消息存储),只要轮询器配置好交易,从存储中移除的消息只有在交易成功完成后才能永久移除。
否则,交易会回滚,且消息没有丢失。 |
随着越来越多的 Spring 项目与“NoSQL”数据存储相关的支持,消息存储的许多其他实现也随之出现。
你也可以提供你自己的实现MessageGroupStore如果找不到满足你具体需求的,可以直接做Interface。
自4.0版本起,我们建议队列通道实例配置为使用频道信息存储,如果可能的话。
这些通常针对此用途进行了优化,相较于通用消息存储。
如果频道信息存储是频道优先信息存储,消息按优先级顺序以先进先出(FIFO)接收。
优先级的概念由消息存储的实现决定。
例如,以下示例展示了MongoDB信道消息存储的Java配置:
-
Java
-
Java DSL
-
Kotlin DSL
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
store.setPriorityEnabled(true);
return store;
}
@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
return IntegrationFlow.from((Channels c) ->
c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
....
.get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
integrationFlow {
channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
}
注意消息组队列类。
那是阻塞队列实现以使用MessageGroupStore操作。 |
另一个定制选项队列通道环境由裁判属性<智:队列>子元素或其特定构造子。
该属性为任意提供引用java.util.Queue实现。
例如,Hazelcast 分布式IQue可以配置如下:
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config()
.setProperty("hazelcast.logging.type", "log4j"));
}
@Bean
public PollableChannel distributedQueue() {
return new QueueChannel(hazelcastInstance()
.getQueue("springIntegrationQueue"));
}
发布订阅频道配置
要创建一个发布订阅频道,使用<发布-订阅频道/>元素。
使用该元素时,你还可以指定任务执行者用于发布消息(若未指定,则发布在发送方线程中),具体如下:
-
Java
-
XML
@Bean
public MessageChannel pubsubChannel() {
return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>
如果你在下游提供重序列器或聚合器发布订阅频道你可以在通道上设置“应用序列”属性为true.
这样做表示通道应设置序列大小和序列号消息头部以及传递消息前的相关ID。
例如,如果有五个订阅者,则序列大小将被设置为5,消息则会序列号头部值范围为1自5.
同时执行者,你也可以配置错误处理程序.
默认情况下,发布订阅频道使用消息发布错误处理实现时发送错误到消息频道来自errorChannel标题或进入全局errorChannel实例。
如果执行者未配置,错误处理程序被忽略,异常会直接抛入调用者的线程。
如果你提供重序器或聚合下游发布订阅频道你可以在通道上设置“应用序列”属性为true.
这样做意味着信道应在传递消息前设置序列大小和序列号头部以及相关ID。
例如,如果有五个订阅者,序列大小将设置为5,消息的序列号头值为1自5.
以下示例展示了如何设置应用序列头部 至true:
-
Java
-
XML
@Bean
public MessageChannel pubsubChannel() {
PublishSubscribeChannel channel = new PublishSubscribeChannel();
channel.setApplySequence(true);
return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
这应用序列值为false默认允许发布-订阅信道向多个出站信道发送完全相同的消息实例。
由于Spring Integration强制有效载荷和头部引用的不可变性,当标志设置为true,信道生成新的消息具有相同有效载荷引用但头部值不同的实例。 |
从5.4.3版本开始,发布订阅频道也可以配置为要求订阅者其选项广播调度员以表明该频道在没有订阅者时不会无声无视消息。
一个MessageDispatchingException其中调度员没有用户当没有订阅者时,会抛出消息,且该选项设置为true.
执行者频道
要创建一个执行者频道,添加<调度员>具有 的子元素任务执行者属性。
该属性的值可以引用任意任务执行者在上下文中。
例如,这样做可以配置线程池,用于向订阅的处理器发送消息。
如前所述,这样做会破坏发送端和接收端之间的单线程执行上下文,使得调用处理程序时不会共享任何活跃事务上下文(即处理程序可以抛出例外,但发送祈祷已经成功回归)。
以下示例展示了如何使用调度元素 并指定执行者任务执行者属性:
-
Java
-
XML
@Bean
public MessageChannel executorChannel() {
return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
<int:dispatcher task-executor="someExecutor"/>
</int:channel>
|
这
|
优先频道配置
要创建一个优先频道,使用<优先队列/>子元素,如下示例所示:
-
Java
-
XML
@Bean
public PollableChannel priorityChannel() {
return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
<int:priority-queue capacity="20"/>
</int:channel>
默认情况下,频道会查询优先权消息的头部。
不过,你也可以选择定制比较仪参考。
另外,请注意优先频道(和其他类型一样)确实支持数据类型属性。
与队列通道,它还支持能力属性。
以下示例展示了所有这些:
-
Java
-
XML
@Bean
public PollableChannel priorityChannel() {
PriorityChannel channel = new PriorityChannel(20, widgetComparator());
channel.setDatatypes(example.Widget.class);
return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
<int:priority-queue comparator="widgetComparator"
capacity="10"/>
</int:channel>
会合频道配置
一个会合频道当队列子元素是 时创建的<集合队列>.
它不提供任何额外的配置选项,且其队列不接受任何容量值,因为它是一个零容量的直接切换队列。
以下示例展示了如何声明会合频道:
-
Java
-
XML
@Bean
public PollableChannel rendezvousChannel() {
return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
<int:rendezvous-queue/>
</int:channel>
通道拦截器配置
消息信道也可能包含拦截器,如信道拦截器中所述。
这<拦截机/>子元素可以添加到<频道/>(或者更具体的元素类型)。
你可以提供裁判属性以引用任何实现通道拦截者界面,如下示例所示:
<int:channel id="exampleChannel">
<int:interceptors>
<ref bean="trafficMonitoringInterceptor"/>
</int:interceptors>
</int:channel>
一般来说,我们建议在独立位置定义拦截器实现,因为它们通常提供可在多个信道间重复使用的共同行为。
全局通道拦截器配置
信道拦截器为每个信道应用交叉截切行为提供了一种简洁简洁的方法。 如果在多个信道上也应用相同的行为,为每个信道配置同一套拦截器并不是最高效的方式。 为了避免重复配置,同时允许拦截器应用于多个信道,Spring Integration提供了全局拦截器。 请考虑以下两个例子:
<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
<bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>
<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>
每<通道拦截器/>元素允许你定义一个全局拦截器,该拦截器应用在所有符合由模式属性。
在前述情况下,全局拦截器应用在“thing1”信道以及所有以“thing2”或“input”开头的信道上,但不应用于以“thing3”开头的信道(自版本5.0起)。
将这种语法加入模式会带来一个可能(虽然可能性不大)的问题。
如果你有一颗叫做!thing1你包含了一个模式!thing1在你的频道拦截机模式模式,已经不匹配了。
现在图案与所有未命名的豆子相匹配东西1.
在这种情况下,你可以跳出!在与 的模式中。
图案\\!事情1匹配一个名为!thing1. |
顺序属性允许你管理在同一信道上有多个拦截器时,该拦截器被注入的位置。 例如,信道“inputChannel”可以在本地配置独立拦截器(见下文),如下示例所示:
<int:channel id="inputChannel">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
一个合理的问题是:“全局拦截器是如何与本地配置的其他拦截器相较的,或者通过其他全局拦截器定义进行注入?”
当前实现提供了定义拦截器执行顺序的简单机制。
在次序属性保证拦截器在现有拦截器之后注入,负数则表示拦截器在现有拦截器之前注入。
这意味着在前述例子中,全局拦截器在(因为其次序大于0)本地配置的“窃听”拦截器。
如果存在另一个全局拦截器与匹配的模式其阶数通过比较两个拦截器的数值确定次序属性。
要在现有拦截器之前注入全局拦截器,使用负值次序属性。
注意次序和模式属性是可选的。
默认值次序将为0,且模式,默认为“*”(用于匹配所有通道)。 |
窃听
如前所述,Spring Integration 提供了一个简单的窃听器。
你可以在<拦截机/>元素。
这样做对调试尤其有用,可以与 Spring Integration 的日志通道适配器配合使用,具体如下:
<int:channel id="in">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:logging-channel-adapter id="logger" level="DEBUG"/>
“logging-channel-adapter”还接受“expression”属性,以便你可以根据“payload”和“headers”变量评估SpEL表达式。
或者,记录完整消息toString()结果,给出一个值true用于“log-full-message”属性。
默认情况下,它是false这样只记录有效载荷。
设置为true除了有效载荷外,还支持对所有头部进行日志记录。
“表达式”选项提供了最大的灵活性(例如,expression=“payload.user.name”). |
关于线控及其他类似组件(消息发布配置)的一个常见误解是它们本质上是自动异步的。 默认情况下,线路分流器作为组件不会异步调用。 相反,Spring Integration专注于配置异步行为的统一方法:消息通道。 使消息流某些部分是同步或异步的,取决于该流中配置的消息通道类型。 这是消息通道抽象的主要优势之一。 自框架诞生以来,我们一直强调信息渠道作为框架一等公民的必要性和价值。 它不仅仅是EIP模式的内部隐式实现。 它作为一个可配置组件完全向终端用户开放。 因此,窃听组件仅负责执行以下任务:
-
通过接入通道拦截消息流(例如,
频道A) -
抓取每条消息
-
将消息发送到另一个信道(例如,
频道B)
它本质上是桥式模式的变体,但封装在通道定义中(因此更容易启用和禁用而不破坏流)。 此外,与桥接器不同,它基本上是分叉了另一条消息流。 这是同步流还是异步流?答案取决于“channelB”属于哪种消息通道。 我们有以下选项:直接通道、可轮询通道和执行通道。 后两者突破线程边界,使得此类信道上的通信异步,因为该信道向其订阅处理程序发送消息的分发发生在与发送消息的线程不同的线程上。 这就是让你的线接流成为同步或异步的原因。 它与框架内的其他组件(如消息发布器)保持一致,通过避免你提前担心(除了编写线程安全代码外)某段代码应当实现同步还是异步,从而增加了一致性和简洁性。 两段代码(比如组件A和组件B)在消息通道上的实际连接,决定了它们的协作是同步或异步的。 你未来甚至可能想从同步切换到异步,消息通道让你可以快速切换,而无需作代码。
关于窃听的最后一点是,尽管上述理由说明默认不异步,但通常最好尽快将消息传递。 因此,使用异步信道选项作为窃听器的出站信道是相当常见的做法。 然而,异步行为并非默认强制执行。 如果我们这样做,会有许多用例出现问题,包括你可能不想打破事务边界。 也许你使用窃听模式进行审计,并且希望审计消息能在原始交易中发送。 举个例子,你可以把线路分接器连接到JMS的外呼通道适配器。 这样你就能兼顾两全:1)JMS消息的发送可以在事务中进行,同时2)仍是“发射后遗忘”作,从而避免主消息流中出现明显延迟。
从4.0版本开始,当拦截器(例如,窃听类)引用了一个频道。
你需要排除这些频道,不包括当前拦截器拦截的通道。
这可以通过适当的模式或程序化实现。
如果你有自定义通道拦截者这涉及一个渠道,考虑实现否决能力拦截者.
这样,框架会根据提供的模式询问拦截者是否可以拦截每个候选信道。
你也可以在拦截器方法中添加运行时保护,确保该信道不会被拦截器引用。
这窃听同时使用这两种技巧。 |
从4.3版本开始,窃听有额外的构造子,取频道名称而不是消息频道实例。
这对 Java 配置和使用通道自动创建逻辑时非常方便。
目标消息频道豆子由提供的频道名称后来,在与
拦截 器。
信道分辨率需要豆子工厂,因此线接实例必须是Spring管理的豆子。 |
这种晚绑定方法还允许简化典型的 Java DSL 配置窃听模式,如下示例所示:
@Bean
public PollableChannel myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input")
.get();
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
条件线路分流器
通过使用选择器或选择表达式属性。
这选择器参考文献消息选择器BEAN,可以在运行时决定消息是否应进入分接通道。
同样,选择表达式是一个布尔SpEL表达式,具有相同目的:如果该表达式值为true,消息被发送到分接信道。
全局线分流配置
可以将全局窃听器配置配置为全局通道拦截器配置的特殊情况。
为此,需要配置一个顶层窃听元素。
现在,除了平常之外窃听命名空间支持,模式和次序属性得到支持,工作方式与通道拦截器.
以下示例展示了如何配置全局窃听器:
-
Java
-
XML
@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
全局线分流器提供了一种方便的方式,可以在外部配置单信道窃听器,而无需修改现有信道配置。
为此,设置模式归属于目标频道名称。
例如,你可以利用该技术配置测试用例以验证信道上的消息。 |