|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
路由实现
由于基于内容的路由通常需要一些特定于领域的逻辑,大多数用例都要求通过使用XML命名空间支持或注解将控制权委托给POJO。这两种方法将在后面讨论。 然而,在此之前,我们首先介绍几种满足常见需求的实现。
PayloadTypeRouter
一个PayloadTypeRouter将消息发送到由payload-type映射定义的通道,如下例所示:
<bean id="payloadTypeRouter"
class="org.springframework.integration.router.PayloadTypeRouter">
<property name="channelMapping">
<map>
<entry key="java.lang.String" value-ref="stringChannel"/>
<entry key="java.lang.Integer" value-ref="integerChannel"/>
</map>
</property>
</bean>
配置PayloadTypeRouter也通过Spring Integration提供的命名空间(参见Namespace Support)所支持,这实际上通过将<router/>配置与其相应的实现(使用<bean/>元素定义)合并为一个更简洁的配置元素来简化了配置。
以下示例展示了一个等效于上述配置但利用了命名空间支持的PayloadTypeRouter配置:
<int:payload-type-router input-channel="routingChannel">
<int:mapping type="java.lang.String" channel="stringChannel" />
<int:mapping type="java.lang.Integer" channel="integerChannel" />
</int:payload-type-router>
以下示例显示了等效的路由器配置在Java中:
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}
当使用Java DSL时,有两个选项。
首先,您可以像前一个示例中所示的那样定义路由器对象:
@Bean
public IntegrationFlow routerFlow1() {
return IntegrationFlow.from("routingChannel")
.route(router())
.get();
}
public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}
注意,路由器可以但不一定是一个@Bean。流会在它不是@Bean时进行注册。
第二步,您还可以在DSL流程本身中定义路由函数,如下例所示:
@Bean
public IntegrationFlow routerFlow2() {
return IntegrationFlow.from("routingChannel")
.<Object, Class<?>>route(Object::getClass, m -> m
.channelMapping(String.class, "stringChannel")
.channelMapping(Integer.class, "integerChannel"))
.get();
}
HeaderValueRouter
一个HeaderValueRouter根据个别头部值映射将消息发送到通道。
当创建一个HeaderValueRouter时,它会被初始化为评估的头部名称。
头部的值可能是两件事情之一:
-
任意值
-
一个通道名称
如果是一个任意值,则需要额外的映射将这些头值与频道名称关联。 否则,不需要额外配置。
Spring Integration 提供了一种基于命名空间的 XML 配置来配置一个 HeaderValueRouter。
以下示例展示了当需要映射头值到通道时,HeaderValueRouter 的配置方式:
<int:header-value-router input-channel="routingChannel" header-name="testHeader">
<int:mapping value="someHeaderValue" channel="channelA" />
<int:mapping value="someOtherHeaderValue" channel="channelB" />
</int:header-value-router>
在解析过程中,前面示例中定义的路由器可能会遇到通道解析失败的情况,导致异常。
如果您希望抑制此类异常并将未解析的消息发送到默认输出通道(通过default-output-channel属性标识),可以将resolution-required设置为false。
通常,当消息的头部值未显式映射到某个通道时,这些消息会被发送到 default-output-channel。
然而,如果头部值被映射到了一个通道名称但该通道无法解析,将 resolution-required 属性设置为 false 会导致这样的消息被路由到 default-output-channel。
以下示例显示了等效的路由器配置在Java中:
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter("testHeader");
router.setChannelMapping("someHeaderValue", "channelA");
router.setChannelMapping("someOtherHeaderValue", "channelB");
return router;
}
当使用Java DSL时,有两种选项。 首先,您可以像前面的示例一样定义路由对象:
@Bean
public IntegrationFlow routerFlow1() {
return IntegrationFlow.from("routingChannel")
.route(router())
.get();
}
public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter("testHeader");
router.setChannelMapping("someHeaderValue", "channelA");
router.setChannelMapping("someOtherHeaderValue", "channelB");
return router;
}
注意,路由器可以但不一定是一个@Bean。流会在它不是@Bean时进行注册。
第二步,您还可以在DSL流程本身中定义路由函数,如下例所示:
@Bean
public IntegrationFlow routerFlow2() {
return IntegrationFlow.from("routingChannel")
.route(Message.class, m -> m.getHeaders().get("testHeader", String.class),
m -> m
.channelMapping("someHeaderValue", "channelA")
.channelMapping("someOtherHeaderValue", "channelB"),
e -> e.id("headerValueRouter"))
.get();
}
配置中,不需要将header值映射到频道名称,因为header值本身即代表频道名称。 以下示例展示了一个无需将header值映射到频道名称的路由器:
<int:header-value-router input-channel="routingChannel" header-name="testHeader"/>
|
自 Spring Integration 2.1 版本起,解析通道的行为更加明确。
例如,如果您省略了 基本上,默认情况下,路由器必须能够成功地将消息路由到至少一个通道。
如果真的要丢弃消息,你也必须将 |
RecipientListRouter
一个 RecipientListRouter 将接收到的每个消息发送到静态定义的消息通道列表。
以下示例创建了一个 RecipientListRouter:
<bean id="recipientListRouter"
class="org.springframework.integration.router.RecipientListRouter">
<property name="channels">
<list>
<ref bean="channel1"/>
<ref bean="channel2"/>
<ref bean="channel3"/>
</list>
</property>
</bean>
Spring Integration 还提供了对 RecipientListRouter 配置命名空间的支持(参见 命名空间支持),如下例所示:
<int:recipient-list-router id="customRouter" input-channel="routingChannel"
timeout="1234"
ignore-send-failures="true"
apply-sequence="true">
<int:recipient channel="channel1"/>
<int:recipient channel="channel2"/>
</int:recipient-list-router>
以下示例显示了等效的路由器配置在Java中:
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public RecipientListRouter router() {
RecipientListRouter router = new RecipientListRouter();
router.setSendTimeout(1_234L);
router.setIgnoreSendFailures(true);
router.setApplySequence(true);
router.addRecipient("channel1");
router.addRecipient("channel2");
router.addRecipient("channel3");
return router;
}
以下示例展示了使用Java DSL配置的等效路由器:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.routeToRecipients(r -> r
.applySequence(true)
.ignoreSendFailures(true)
.recipient("channel1")
.recipient("channel2")
.recipient("channel3")
.sendTimeout(1_234L))
.get();
}
这里的 'apply-sequence' 标志与在发布 - 订阅通道(publish-subscribe-channel)中的效果相同,并且与发布 - 订阅通道一样,它在 recipient-list-router 上默认处于禁用状态。
有关更多信息,请参阅 PublishSubscribeChannel 配置。 |
在配置RecipientListRouter时,另一个方便的选择是使用Spring Expression Language (SpEL) 支持作为个别接收通道的筛选器。
这样做类似于在一个‘链’的开头使用一个过滤器来充当“选择性消费者”。
不过,在这种情况下,所有这些功能都被简洁地合并到路由器的配置中,如下例所示:
<int:recipient-list-router id="customRouter" input-channel="routingChannel">
<int:recipient channel="channel1" selector-expression="payload.equals('foo')"/>
<int:recipient channel="channel2" selector-expression="headers.containsKey('bar')"/>
</int:recipient-list-router>
在之前的配置中,由selector-expression属性标识的SpEL表达式被评估以确定此接收者是否应包含在一个给定输入消息的目的地列表中。
该表达式的评估结果必须为boolean。
如果没有定义此属性,则通道始终在目的地列表中。
RecipientListRouterManagement
自从版本4.1起,RecipientListRouter提供了一系列操作来动态地在运行时管理收件人。
这些管理操作通过RecipientListRouterManagement和@ManagedResource注解呈现。
它们可以通过使用控制总线以及通过JMX获得,如下例所示:
<control-bus input-channel="controlBus"/>
<recipient-list-router id="simpleRouter" input-channel="routingChannelA">
<recipient channel="channel1"/>
</recipient-list-router>
<channel id="channel2"/>
Message<?> addRecipientCommandMessage =
MessageBuilder.withPayload("'simpleRouter.handler'.addRecipient")
.setHeader(IntegrationMessageHeaderAccessor.CONTROL_BUS_ARGUMENTS, List.of("channel2"))
.build();
从应用程序启动开始,simpleRouter 只有一个 channel1 接收者。
但在执行 addRecipient 命令后,会添加一个 channel2 接收者。
这是一个“注册对消息部分内容感兴趣”的用例:我们可能在某个时间段内对来自路由器的消息感兴趣,因此订阅了 recipient-list-router,并在某个时刻决定取消订阅。
因为对<recipient-list-router>进行了运行时管理操作,所以可以从一开始就无需任何<recipient>进行配置。
在这种情况下,当没有任何匹配接收方的消息时,RecipientListRouter的行为与没有消息的情况相同。
如果配置了defaultOutputChannel,则将消息发送到那里。
否则,会抛出MessageDeliveryException。
xpath 路由
The XPath 路由器是XML模块的一部分。 见使用XPath路由XML消息.
路由与错误处理
Spring Integration 还提供一种特殊的基于类型的路由器,称为 ErrorMessageExceptionTypeRouter,用于路由错误消息(定义为 payload 为 Throwable 实例的消息)。
ErrorMessageExceptionTypeRouter 类似于 PayloadTypeRouter。
事实上,它们几乎完全相同。
唯一的区别在于,虽然 PayloadTypeRouter 遍历有效负载实例的实例层次结构(例如 payload.getClass().getSuperclass())以查找最具体的类型和通道映射,但 ErrorMessageExceptionTypeRouter 遍历"异常原因"的层次结构(例如 payload.getCause())以查找最具体的 Throwable 类型或通道映射,并使用 mappingClass.isInstance(cause) 将 cause 匹配到类或其任何超类。
在这种情况下,通道映射顺序非常重要。
因此,如果有获取IllegalArgumentException的映射需求但没有RuntimeException的需求,那么必须首先在路由器中配置最后一个。 |
自从版本 4.3 起,ErrorMessageExceptionTypeRouter 在初始化阶段加载所有映射类以在遇到问题时快速失败为 ClassNotFoundException。 |
The following example shows a sample configuration for ErrorMessageExceptionTypeRouter:
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
-
XML DSL
@Bean
public IntegrationFlow someFlow() {
return f -> f
.routeByException(r -> r
.channelMapping(IllegalArgumentException.class, "illegalChannel")
.channelMapping(NullPointerException.class, "npeChannel")
.defaultOutputChannel("defaultChannel"));
}
@Bean
fun someFlow() =
integrationFlow {
routeByException {
channelMapping(IllegalArgumentException::class.java, "illegalChannel")
channelMapping(NullPointerException::class.java, "npeChannel")
defaultOutputChannel("defaultChannel")
}
}
@Bean
someFlow() {
integrationFlow {
routeByException {
channelMapping IllegalArgumentException, 'illegalChannel'
channelMapping NullPointerException, 'npeChannel'
defaultOutputChannel 'defaultChannel'
}
}
}
<int:exception-type-router input-channel="inputChannel"
default-output-channel="defaultChannel">
<int:mapping exception-type="java.lang.IllegalArgumentException"
channel="illegalChannel"/>
<int:mapping exception-type="java.lang.NullPointerException"
channel="npeChannel"/>
</int:exception-type-router>
<int:channel id="illegalChannel" />
<int:channel id="npeChannel" />