|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
注解支持
除了支持配置消息端点的 XML 命名空间外,您还可以使用注解。
首先,Spring Integration 提供了类级别的 @MessageEndpoint 作为 stereotypes 注解,这意味着它自身被 Spring 的 @Component 注解所标注,因此会被 Spring 的组件扫描自动识别为 bean 定义。
更为重要的是各种方法级注解。 它们表明被注解的方法能够处理消息。 以下示例展示了类级和方法级注解:
@MessageEndpoint
public class FooService {
@ServiceActivator
public void processMessage(Message message) {
...
}
}
方法“处理”Message 的确切含义取决于具体的注解。 Spring Integration 中可用的注解包括:
-
@Aggregator(参见 聚合器) -
@Filter(参见 过滤器) -
@Router(参见 路由器) -
@ServiceActivator(参见 服务激活器) -
@Splitter(见 拆分器) -
@Transformer(参见 转换器) -
@InboundChannelAdapter(参见 通道适配器) -
@BridgeFrom(参见 使用 Java 配置配置桥接) -
@BridgeTo(参见 使用 Java 配置配置桥接) -
@MessagingGateway(参见 消息网关) -
@IntegrationComponentScan(参见 配置和@EnableIntegration)
如果您将 XML 配置与注解结合使用,则不需要 @MessageEndpoint 注解。
如果您想从 <service-activator/> 元素的 ref 属性配置 POJO 引用,您可以仅提供方法级注解。
在这种情况下,即使 <service-activator/> 元素上没有方法级属性,该注解也能防止歧义。 |
在大多数情况下,注解处理器方法不需要将 Message 类型作为其参数。
相反,方法参数类型可以匹配消息的有效负载类型,如下例所示:
public class ThingService {
@ServiceActivator
public void bar(Thing thing) {
...
}
}
当方法参数需要从MessageHeaders中的值进行映射时,另一个选项是使用参数级别的@Header注解。
通常,使用Spring Integration注解的方法可以接受Message本身、消息负载或一个头值(配合@Header)作为参数。
事实上,该方法可以接受组合形式,如下面的示例所示:
public class ThingService {
@ServiceActivator
public void otherThing(String payload, @Header("x") int valueX, @Header("y") int valueY) {
...
}
}
您还可以使用 @Headers 注解将所有消息头作为 Map 提供,如下例所示:
public class ThingService {
@ServiceActivator
public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
...
}
}
注解的值也可以是一个 SpEL 表达式(例如,someHeader.toUpperCase()),这在您希望在注入之前操作头值时非常有用。
它还提供可选的 required 属性,用于指定该属性值是否必须在请求头中可用。
required 属性的默认值为 true。 |
对于其中一些注解,当消息处理方法返回非空值时,端点会尝试发送回复。
这在两种配置选项(命名空间注解)中是一致的:使用此类端点的输出通道(如果可用),并将REPLY_CHANNEL消息头值作为后备。
| 端点上的输出通道组合与回复通道消息头启用了流水线方法,其中多个组件具有输出通道,且最终组件允许将回复消息转发到回复通道(如原始请求消息中所指定)。 换句话说,最终组件依赖于原始发送者提供的信息,并因此可以动态支持任意数量的客户端。 这是返回地址模式的一个示例。 |
除了这里展示的示例外,这些注解还支持 inputChannel 和 outputChannel 属性,如下例所示:
@Service
public class ThingService {
@ServiceActivator(inputChannel="input", outputChannel="output")
public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
...
}
}
这些注解的处理会创建与相应的 XML 组件相同的 Bean:AbstractEndpoint 个实例和 MessageHandler 个实例(对于入站通道适配器则为 MessageSource 个实例)。
请参见 @Bean 方法上的注解。
Bean 的名称遵循以下模式生成:[componentName].[methodName].[decapitalizedAnnotationClassShortName]。
在前面的示例中,AbstractEndpoint 的 Bean 名称为 thingService.otherThing.serviceActivator,而 MessageHandler(即 MessageSource)Bean 的名称则在此基础上附加了 .handler(即 .source)后缀。
可以通过在消息注解旁使用 @EndpointId 注解来自定义此类名称。
MessageHandler 实例(即 MessageSource 实例)也可由 消息历史记录 进行跟踪。
从 4.0 版本开始,所有消息传递注解都提供了 SmartLifecycle 选项(autoStartup 和 phase),以允许在应用程序上下文初始化时控制端点的生命周期。
它们分别默认为 true 和 0。
若要更改端点的状态(例如 start() 或 stop()),您可以通过使用 BeanFactory(或自动注入)获取对端点 bean 的引用并调用相应方法。
或者,您可以向 控制总线 发送命令消息。
出于这些目的,您应使用上一段中提到的 beanName。
|
在解析提到的注解后(当未配置特定通道 bean 时),自动创建的通道及其对应的消费者端点,将在上下文初始化的末尾被声明为 beans。
这些 beans 可以在其他服务中通过自动注入使用,但必须标记
|
从版本 6.0 开始,所有消息传递注解现在都是 @Repeatable,因此可以在同一个服务方法上声明多个相同类型的注解,其含义是创建与这些注解重复次数一样多的端点:
@Transformer(inputChannel = "inputChannel1", outputChannel = "outputChannel1")
@Transformer(inputChannel = "inputChannel2", outputChannel = "outputChannel2")
public String transform(String input) {
return input.toUpperCase();
}
使用@Poller注解
在 Spring Integration 4.0 之前,消息注解要求 inputChannel 必须是对 SubscribableChannel 的引用。
对于 PollableChannel 实例,需要一个 <int:bridge/> 元素来配置 <int:poller/>,并使复合端点成为 PollingConsumer。
版本 4.0 引入了 @Poller 注解,允许直接在消息注解上配置 poller 属性,如下例所示:
public class AnnotationService {
@Transformer(inputChannel = "input", outputChannel = "output",
poller = @Poller(maxMessagesPerPoll = "${poller.maxMessagesPerPoll}", fixedDelay = "${poller.fixedDelay}"))
public String handle(String payload) {
...
}
}
@Poller注解仅提供简单的PollerMetadata选项。
您可以使用属性占位符配置@Poller注解的属性(maxMessagesPerPoll、fixedDelay、fixedRate和cron)。
此外,从版本5.1开始,也提供了用于PollingConsumer的receiveTimeout选项。
如果有必要提供更多轮询选项(例如transaction、advice-chain、error-handler等),您应将PollerMetadata配置为通用bean,并将其bean名称用作@Poller的value属性。
在这种情况下,不允许设置其他属性(它们必须在PollerMetadata bean上指定)。
注意,如果inputChannel是一个PollableChannel且未配置@Poller,则将使用默认PollerMetadata(如果它存在于应用上下文中)。
要使用@Configuration注解声明默认轮询器,请使用如下代码示例:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(10));
return pollerMetadata;
}
下面的示例展示了如何使用默认轮询器:
public class AnnotationService {
@Transformer(inputChannel = "aPollableChannel", outputChannel = "output")
public String handle(String payload) {
...
}
}
以下示例展示了如何使用命名轮询器:
@Bean
public PollerMetadata myPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(1000));
return pollerMetadata;
}
以下示例展示了一个使用默认轮询器的端点:
public class AnnotationService {
@Transformer(inputChannel = "aPollableChannel", outputChannel = "output"
poller = @Poller("myPoller"))
public String handle(String payload) {
...
}
}
从版本 4.3.3 开始,@Poller 注解具有 errorChannel 属性,以便更轻松地配置底层的 MessagePublishingErrorHandler。
此属性的作用与 <poller> XML 组件中的 error-channel 相同。
有关更多信息,请参阅端点命名空间支持。
消息注解上的 poller() 属性与 reactive() 属性互斥。
更多信息请参见下一节。
使用@Reactive注解
ReactiveStreamsConsumer自版本5.0以来就已存在,但仅当端点的输入通道是FluxMessageChannel(或任何org.reactivestreams.Publisher实现)时才应用。
从版本5.3开始,当目标消息处理器为ReactiveMessageHandler时,框架也会创建其实例,与输入通道类型无关。
@Reactive子注解(类似于上述提到的@Poller)自版本5.5起已引入所有消息注解中。
它接受一个可选的Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> Bean引用,并且无论输入通道类型和消息处理器如何,都将目标端点转换为ReactiveStreamsConsumer实例。
该函数由Flux.transform()操作符使用,以对来自输入通道的响应式流源应用某些自定义(publishOn()、doOnNext()、log()、retry()等)。
以下示例演示了如何独立于最终订阅者和生产者,将输入通道的发布线程更改为 DirectChannel:
@Bean
public Function<Flux<?>, Flux<?>> publishOnCustomizer() {
return flux -> flux.publishOn(Schedulers.parallel());
}
@ServiceActivator(inputChannel = "directChannel", reactive = @Reactive("publishOnCustomizer"))
public void handleReactive(String payload) {
...
}
消息注解上的 reactive() 属性与 poller() 属性互斥。
有关更多信息,请参阅 使用 @Poller 注解 和 响应式流支持。
使用@InboundChannelAdapter注解
版本 4.0 引入了 @InboundChannelAdapter 方法级注解。
它基于 MethodInvokingMessageSource 为一个被注解的方法生成一个 SourcePollingChannelAdapter 集成组件。
该注解是 <int:inbound-channel-adapter> XML 组件的等价物,具有相同的限制:方法不能有参数,且返回类型不能为 void。
它有两个属性:value(必需的 MessageChannel Bean 名称)和 poller(可选的 @Poller 注解,如前文所述)。
如果您需要提供某些 MessageHeaders,请使用 Message<?> 返回类型并使用 MessageBuilder 来构建 Message<?>。
使用 MessageBuilder 允许您配置 MessageHeaders。
以下示例展示了如何使用 @InboundChannelAdapter 注解:
@InboundChannelAdapter("counterChannel")
public Integer count() {
return this.counter.incrementAndGet();
}
@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixed-rate = "5000"))
public String foo() {
return "foo";
}
版本 4.3 为 value 注解属性引入了 channel 别名,以提供更好的源代码可读性。
此外,目标 MessageChannel Bean 是在第一次 receive() 调用时通过提供的名称(由 outputChannelName 选项设置)在 SourcePollingChannelAdapter 中解析的,而不是在初始化阶段。这使得“延迟绑定”逻辑成为可能:从消费者角度来看,目标 MessageChannel Bean 的创建和注册比 @InboundChannelAdapter 解析阶段稍晚一些。
第一个示例要求默认轮询器已在应用程序上下文中其他地方声明。
使用 @MessagingGateway 注解
使用@IntegrationComponentScan注解
标准的 Spring Framework @ComponentScan 注解不会扫描具有 stereotype @Component 注解的接口。
为了克服这一限制并允许配置 @MessagingGateway(参见 @MessagingGateway 注解),我们引入了 @IntegrationComponentScan 机制。
此注解必须与 @Configuration 注解一起放置,并自定义以定义其扫描选项,
例如 basePackages 和 basePackageClasses。
在这种情况下,所有带有 @MessagingGateway 注解的发现接口都会被解析并注册为 GatewayProxyFactoryBean 实例。
所有其他基于类的组件则由标准的 @ComponentScan 进行解析。