|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
消息发布
面向切面编程(AOP)的消息发布功能允许您将消息的构建和发送作为方法调用的副产品。例如,假设您有一个组件,每当该组件的状态发生变化时,您都希望收到一条消息通知。 发送此类通知最简单的方法是向专用通道发送消息,但如何将改变对象状态的方法调用与消息发送过程连接起来,以及如何结构化通知消息呢? AOP 消息发布功能通过基于配置的方法处理这些职责。
消息发布配置
Spring Integration 提供了两种方法:XML 配置和基于注解(Java)的配置。
使用注解驱动的 Spring 框架配置@Publisher注解
注解驱动方法允许您使用 @Publisher 注解来标注任意方法,以指定 'channel' 属性。
从 5.1 版本开始,要启用此功能,您必须在某个 @Configuration 类上使用 @EnablePublisher 注解。
有关更多信息,请参阅 配置和 @EnableIntegration。
消息是根据方法调用的返回值构建的,并发送到由 'channel' 属性指定的通道。
为了进一步管理消息结构,您还可以结合使用 @Payload 和 @Header 注解。
在内部,Spring Integration 的此消息发布功能通过定义 PublisherAnnotationAdvisor 并使用 Spring 表达式语言 (SpEL) 来利用 Spring AOP,从而让您对发布的 Message 结构拥有相当大的灵活性和控制力。
The PublisherAnnotationAdvisor 定义并绑定以下变量:
-
#return: 绑定到返回值,允许您引用它或其属性(例如#return.something,其中 'something' 是绑定到#return的对象的属性) -
#exception: 绑定到方法调用时抛出的异常(如果存在) -
#args: 绑定到方法参数,以便您可以按名称提取各个参数(例如,#args.fname)
考虑以下示例:
@Publisher
public String defaultPayload(String fname, String lname) {
return fname + " " + lname;
}
在上面的示例中,消息是按以下结构构建的:
-
消息负载是方法的返回类型和值。 这是默认设置。
-
新建的消息被发送到默认发布器通道,该通道已配置了注解后置处理器(本节稍后将介绍)。
以下示例与前面的示例相同,只是它不使用默认发布通道:
@Publisher(channel="testChannel")
public String defaultPayload(String fname, @Header("last") String lname) {
return fname + " " + lname;
}
不使用默认的发布通道,我们通过设置@Publisher注解的'channel'属性来指定发布通道。
我们还添加了一个@Header注解,这使得名为'last'的消息头具有与'lname'方法参数相同的值。
该消息头被添加到新构建的消息中。
以下示例与前面的示例几乎完全相同:
@Publisher(channel="testChannel")
@Payload
public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) {
return fname + " " + lname;
}
唯一的区别在于我们在方法上使用了 @Payload 注解,以明确指定方法的返回值应作为消息的有效负载(payload)使用。
以下示例通过在使用 Spring Expression Language 的 @Payload 注解中进一步说明框架应如何构建消息,从而扩展了先前的配置:
@Publisher(channel="testChannel")
@Payload("#return + #args.lname")
public String setName(String fname, String lname, @Header("x") int num) {
return fname + " " + lname;
}
在前面的示例中,消息是方法调用返回值与 'lname' 输入参数的拼接结果。 名为 'x' 的 Message 头部的值由 'num' 输入参数确定。 该头部被添加到新构建的消息中。
@Publisher(channel="testChannel")
public String argumentAsPayload(@Payload String fname, @Header String lname) {
return fname + " " + lname;
}
在上面的示例中,您看到了 @Payload 注解的另一种用法。
这里,我们为一个方法参数添加注解,该参数将成为新构建消息的有效负载(payload)。
与 Spring 中大多数其他基于注解的功能一样,您需要注册一个后置处理器 (PublisherAnnotationBeanPostProcessor)。
以下示例展示了如何操作:
<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>
对于更简洁的配置,您可以改用命名空间支持,如下例所示:
<int:annotation-config>
<int:enable-publisher default-publisher-channel="defaultChannel"/>
</int:annotation-config>
对于 Java 配置,您必须使用 @EnablePublisher 注解,如下例所示:
@Configuration
@EnableIntegration
@EnablePublisher("defaultChannel")
public class IntegrationConfiguration {
...
}
从版本 5.1.3 开始,<int:enable-publisher> 组件以及 @EnablePublisher 注解均具备 proxy-target-class 和 order 属性,用于调整 ProxyFactory 配置。
与其他 Spring 注解(@Component、@Scheduled等)类似,您也可以将@Publisher用作元注解。
这意味着您可以定义自己的注解,这些注解的处理方式与@Publisher本身相同。
以下示例展示了如何实现这一点:
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Publisher(channel="auditChannel")
public @interface Audit {
...
}
在上面的示例中,我们定义了 @Audit 注解,该注解本身又被 @Publisher 注解所修饰。
另外请注意,您可以在元注解上定义一个 channel 属性,以封装消息在此注解内部发送的位置。
现在,您可以像以下示例所示,使用 @Audit 注解来标注任意方法:
@Audit
public String test() {
return "Hello";
}
在前面的示例中,每次调用 test() 方法都会生成一条消息,其负载由该方法的返回值创建。
每条消息都被发送到名为 auditChannel 的通道。
这种技术的一个好处是,您可以避免在多个注解中重复相同的通道名称。
此外,您还可以在您自己的(可能是特定于领域的)注解与框架提供的注解之间提供一层间接性。
您也可以对类进行注解,这将允许您将该注解的属性应用于该类的每个公共方法,如下例所示:
@Audit
static class BankingOperationsImpl implements BankingOperations {
public String debit(String amount) {
. . .
}
public String credit(String amount) {
. . .
}
}
基于 XML 的方法<publishing-interceptor>元素
基于 XML 的方法允许您配置与命名空间配置的 MessagePublishingInterceptor 相同的基于 AOP 的消息发布功能。
它确实比基于注解的方法有一些优势,因为它允许您使用 AOP 切点表达式,从而可能一次性拦截多个方法,或拦截并发布您没有源代码的方法。
要通过 XML 配置消息发布,您只需执行以下两项操作:
-
使用
<publishing-interceptor>XML 元素为MessagePublishingInterceptor提供配置。 -
提供 AOP 配置以将
MessagePublishingInterceptor应用于受管对象。
下面的示例展示了如何配置一个 publishing-interceptor 元素:
<aop:config>
<aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" />
</aop:config>
<publishing-interceptor id="interceptor" default-channel="defaultChannel">
<method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel">
<header name="things" value="something"/>
</method>
<method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel">
<header name="things" expression="'something'.toUpperCase()"/>
</method>
<method pattern="echoDef*" payload="#return"/>
</publishing-interceptor>
The <publishing-interceptor> 配置看起来与基于注解的方法相当相似,并且它也利用了 Spring 表达式语言(Spring Expression Language)的强大功能。
在上面的示例中,echo 方法在 testBean 上执行后,会渲染出具有如下结构的 Message:
-
Message负载的类型为String,其内容如下:Echoing: [value],其中value是执行方法后返回的值。 -
The
Message有一个标题,其名称为things,值为something。 -
The
Messageis sent toechoChannel.
第二种方法与第一种非常相似。
在这里,每个以'repl'开头的方法都会渲染一个具有以下结构的Message:
-
The
Messagepayload is the same as in the preceding sample. -
Message有一个名为things的头部,其值为 SpEL 表达式'something'.toUpperCase()的结果。 -
The
Messageis sent toechoChannel.
第二种方法,映射任何以 echoDef 开头的方法的执行,会产生一个具有如下结构的 Message:
-
Message负载是执行方法后返回的值。 -
由于未提供
channel属性,因此将Message发送给由publisher定义的defaultChannel。
对于简单的映射规则,您可以依赖publisher默认值,如下例所示:
<publishing-interceptor id="anotherInterceptor"/>
上述示例将匹配切点表达式的每个方法的返回值映射到有效负载,并发送至 default-channel。
如果您未指定 defaultChannel(如上述示例所示),则消息将被发送至全局 nullChannel(相当于 /dev/null)。
异步发布
发布操作与组件的执行在同一个线程中进行。 因此,默认情况下它是同步的。 这意味着整个消息流必须等待发布者的流程完成。 然而,开发者通常希望完全相反:利用此消息发布功能来启动异步流程。 例如,您可能托管一个服务(HTTP、WS 等),用于接收远程请求。 您或许希望将此请求内部转发到一个可能需要较长时间处理的流程中。 但同时,您可能也希望立即向用户返回响应。 因此,与其将传入的请求发送到输出通道(这是传统方式)进行处理,不如使用 'output-channel' 或 'replyChannel' 头信息,向调用方发送一个简单的确认类回复,同时利用消息发布功能来启动复杂的流程。
以下示例中的服务接收一个复杂的有效负载(需要进一步发送处理),但同时也需要向调用方回复一个简单的确认:
public String echo(Object complexPayload) {
return "ACK";
}
因此,我们不是将复杂的流程连接到输出通道,而是使用消息发布功能。
我们通过使用服务方法的输入参数(如前一个示例所示)来配置它以创建一个新消息,并将其发送到“localProcessChannel”。
为了确保该流程是异步的,我们只需将其发送到任何类型的异步通道(下一个示例中的ExecutorChannel)即可。
以下示例展示了如何创建异步的publishing-interceptor:
<int:service-activator input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/>
<bean id="sampleService" class="test.SampleService"/>
<aop:config>
<aop:advisor advice-ref="interceptor" pointcut="bean(sampleService)" />
</aop:config>
<int:publishing-interceptor id="interceptor" >
<int:method pattern="echo" payload="#args[0]" channel="localProcessChannel">
<int:header name="sample_header" expression="'some sample value'"/>
</int:method>
</int:publishing-interceptor>
<int:channel id="localProcessChannel">
<int:dispatcher task-executor="executor"/>
</int:channel>
<task:executor id="executor" pool-size="5"/>
处理此类场景的另一种方式是使用线形监听器(wire-tap)。 参见 Wire Tap。
基于计划触发器生成和发布消息
在前面的章节中,我们探讨了消息发布功能,该功能作为方法调用的副产品来构建并发布消息。
然而,在这些情况下,您仍然负责调用该方法。
Spring Integration 2.0 通过为' inbound-channel-adapter '元素添加新的expression属性,支持了定时消息生产者和发布者。
您可以基于多种触发器进行调度,其中任意一种均可在' poller '元素中进行配置。
目前,我们支持cron、fixed-rate、fixed-delay以及由您实现并通过' trigger '属性值引用的任何自定义触发器。
如前所述,计划生产者(producers)和发布者(publishers)的支持通过 <inbound-channel-adapter> XML 元素提供。
考虑以下示例:
<int:inbound-channel-adapter id="fixedDelayProducer"
expression="'fixedDelayTest'"
channel="fixedDelayChannel">
<int:poller fixed-delay="1000"/>
</int:inbound-channel-adapter>
上述示例创建了一个入站通道适配器,该适配器构建一个Message,其负载为在expression属性中定义的表达式的结果。
每当发生由fixed-delay属性指定的延迟时,就会创建并发送此类消息。
下面的示例与前面的示例类似,不同之处在于它使用了 fixed-rate 属性:
<int:inbound-channel-adapter id="fixedRateProducer"
expression="'fixedRateTest'"
channel="fixedRateChannel">
<int:poller fixed-rate="1000"/>
</int:inbound-channel-adapter>
fixed-rate 属性允许您以固定速率发送消息(从每个任务的开始时间进行测量)。
下面的示例展示了如何在 cron 属性中指定值来应用 Cron 触发器:
<int:inbound-channel-adapter id="cronProducer"
expression="'cronTest'"
channel="cronChannel">
<int:poller cron="7 6 5 4 3 ?"/>
</int:inbound-channel-adapter>
以下示例展示了如何向消息中插入额外的头信息:
<int:inbound-channel-adapter id="headerExpressionsProducer"
expression="'headerExpressionsTest'"
channel="headerExpressionsChannel"
auto-startup="false">
<int:poller fixed-delay="5000"/>
<int:header name="foo" expression="6 * 7"/>
<int:header name="bar" value="x"/>
</int:inbound-channel-adapter>
额外的消息头可以接受标量值或 Spring 表达式求值的结果。
如果您需要实现自己的自定义触发器,可以使用 trigger 属性来引用任何实现了 org.springframework.scheduling.Trigger 接口的 Spring 配置 Bean。
以下示例展示了如何实现:
<int:inbound-channel-adapter id="triggerRefProducer"
expression="'triggerRefTest'" channel="triggerRefChannel">
<int:poller trigger="customTrigger"/>
</int:inbound-channel-adapter>
<beans:bean id="customTrigger" class="o.s.scheduling.support.PeriodicTrigger">
<beans:constructor-arg value="9999"/>
</beans:bean>