如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

注解支持

除了支持配置消息端点的 XML 命名空间外,您还可以使用注解。 首先,Spring Integration 提供了类级别的 @MessageEndpoint 作为 stereotypes 注解,这意味着它自身被 Spring 的 @Component 注解所标注,因此会被 Spring 的组件扫描自动识别为 bean 定义。spring-doc.cadn.net.cn

更为重要的是各种方法级注解。 它们表明被注解的方法能够处理消息。 以下示例展示了类级和方法级注解:spring-doc.cadn.net.cn

@MessageEndpoint
public class FooService {

    @ServiceActivator
    public void processMessage(Message message) {
        ...
    }
}

方法“处理”Message 的确切含义取决于具体的注解。 Spring Integration 中可用的注解包括:spring-doc.cadn.net.cn

如果您将 XML 配置与注解结合使用,则不需要 @MessageEndpoint 注解。 如果您想从 <service-activator/> 元素的 ref 属性配置 POJO 引用,您可以仅提供方法级注解。 在这种情况下,即使 <service-activator/> 元素上没有方法级属性,该注解也能防止歧义。

在大多数情况下,注解处理器方法不需要将 Message 类型作为其参数。 相反,方法参数类型可以匹配消息的有效负载类型,如下例所示:spring-doc.cadn.net.cn

public class ThingService {

    @ServiceActivator
    public void bar(Thing thing) {
        ...
    }

}

当方法参数需要从MessageHeaders中的值进行映射时,另一个选项是使用参数级别的@Header注解。 通常,使用Spring Integration注解的方法可以接受Message本身、消息负载或一个头值(配合@Header)作为参数。 事实上,该方法可以接受组合形式,如下面的示例所示:spring-doc.cadn.net.cn

public class ThingService {

    @ServiceActivator
    public void otherThing(String payload, @Header("x") int valueX, @Header("y") int valueY) {
        ...
    }

}

您还可以使用 @Headers 注解将所有消息头作为 Map 提供,如下例所示:spring-doc.cadn.net.cn

public class ThingService {

    @ServiceActivator
    public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
        ...
    }

}
注解的值也可以是一个 SpEL 表达式(例如,someHeader.toUpperCase()),这在您希望在注入之前操作头值时非常有用。 它还提供可选的 required 属性,用于指定该属性值是否必须在请求头中可用。 required 属性的默认值为 true

对于其中一些注解,当消息处理方法返回非空值时,端点会尝试发送回复。 这在两种配置选项(命名空间注解)中是一致的:使用此类端点的输出通道(如果可用),并将REPLY_CHANNEL消息头值作为后备。spring-doc.cadn.net.cn

端点上的输出通道组合与回复通道消息头启用了流水线方法,其中多个组件具有输出通道,且最终组件允许将回复消息转发到回复通道(如原始请求消息中所指定)。 换句话说,最终组件依赖于原始发送者提供的信息,并因此可以动态支持任意数量的客户端。 这是返回地址模式的一个示例。

除了这里展示的示例外,这些注解还支持 inputChanneloutputChannel 属性,如下例所示:spring-doc.cadn.net.cn

@Service
public class ThingService {

    @ServiceActivator(inputChannel="input", outputChannel="output")
    public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
        ...
    }

}

这些注解的处理会创建与相应的 XML 组件相同的 Bean:​AbstractEndpoint 个实例和 MessageHandler 个实例(对于入站通道适配器则为 MessageSource 个实例)。 请参见 @Bean 方法上的注解。 Bean 的名称遵循以下模式生成:[componentName].[methodName].[decapitalizedAnnotationClassShortName]。 在前面的示例中,AbstractEndpoint 的 Bean 名称为 thingService.otherThing.serviceActivator,而 MessageHandler(即 MessageSource)Bean 的名称则在此基础上附加了 .handler(即 .source)后缀。 可以通过在消息注解旁使用 @EndpointId 注解来自定义此类名称。 MessageHandler 实例(即 MessageSource 实例)也可由 消息历史记录 进行跟踪。spring-doc.cadn.net.cn

从 4.0 版本开始,所有消息传递注解都提供了 SmartLifecycle 选项(autoStartupphase),以允许在应用程序上下文初始化时控制端点的生命周期。 它们分别默认为 true0。 若要更改端点的状态(例如 start()stop()),您可以通过使用 BeanFactory(或自动注入)获取对端点 bean 的引用并调用相应方法。 或者,您可以向 控制总线 发送命令消息。 出于这些目的,您应使用上一段中提到的 beanNamespring-doc.cadn.net.cn

在解析提到的注解后(当未配置特定通道 bean 时),自动创建的通道及其对应的消费者端点,将在上下文初始化的末尾被声明为 beans。 这些 beans 可以在其他服务中通过自动注入使用,但必须标记@Lazy注解,因为在正常的自动注入处理期间,这些定义通常还不可用。spring-doc.cadn.net.cn

@Autowired
@Lazy
@Qualifier("someChannel")
MessageChannel someChannel;
...

@Bean
Thing1 dependsOnSPCA(@Qualifier("someInboundAdapter") @Lazy SourcePollingChannelAdapter someInboundAdapter) {
    ...
}

从版本 6.0 开始,所有消息传递注解现在都是 @Repeatable,因此可以在同一个服务方法上声明多个相同类型的注解,其含义是创建与这些注解重复次数一样多的端点:spring-doc.cadn.net.cn

@Transformer(inputChannel = "inputChannel1", outputChannel = "outputChannel1")
@Transformer(inputChannel = "inputChannel2", outputChannel = "outputChannel2")
public String transform(String input) {
    return input.toUpperCase();
}

使用@Poller注解

在 Spring Integration 4.0 之前,消息注解要求 inputChannel 必须是对 SubscribableChannel 的引用。 对于 PollableChannel 实例,需要一个 <int:bridge/> 元素来配置 <int:poller/>,并使复合端点成为 PollingConsumer。 版本 4.0 引入了 @Poller 注解,允许直接在消息注解上配置 poller 属性,如下例所示:spring-doc.cadn.net.cn

public class AnnotationService {

    @Transformer(inputChannel = "input", outputChannel = "output",
        poller = @Poller(maxMessagesPerPoll = "${poller.maxMessagesPerPoll}", fixedDelay = "${poller.fixedDelay}"))
    public String handle(String payload) {
        ...
    }
}

@Poller注解仅提供简单的PollerMetadata选项。 您可以使用属性占位符配置@Poller注解的属性(maxMessagesPerPollfixedDelayfixedRatecron)。 此外,从版本5.1开始,也提供了用于PollingConsumerreceiveTimeout选项。 如果有必要提供更多轮询选项(例如transactionadvice-chainerror-handler等),您应将PollerMetadata配置为通用bean,并将其bean名称用作@Pollervalue属性。 在这种情况下,不允许设置其他属性(它们必须在PollerMetadata bean上指定)。 注意,如果inputChannel是一个PollableChannel且未配置@Poller,则将使用默认PollerMetadata(如果它存在于应用上下文中)。 要使用@Configuration注解声明默认轮询器,请使用如下代码示例:spring-doc.cadn.net.cn

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setTrigger(new PeriodicTrigger(10));
    return pollerMetadata;
}

下面的示例展示了如何使用默认轮询器:spring-doc.cadn.net.cn

public class AnnotationService {

    @Transformer(inputChannel = "aPollableChannel", outputChannel = "output")
    public String handle(String payload) {
        ...
    }
}

以下示例展示了如何使用命名轮询器:spring-doc.cadn.net.cn

@Bean
public PollerMetadata myPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setTrigger(new PeriodicTrigger(1000));
    return pollerMetadata;
}

以下示例展示了一个使用默认轮询器的端点:spring-doc.cadn.net.cn

public class AnnotationService {

    @Transformer(inputChannel = "aPollableChannel", outputChannel = "output"
                           poller = @Poller("myPoller"))
    public String handle(String payload) {
         ...
    }
}

从版本 4.3.3 开始,@Poller 注解具有 errorChannel 属性,以便更轻松地配置底层的 MessagePublishingErrorHandler。 此属性的作用与 <poller> XML 组件中的 error-channel 相同。 有关更多信息,请参阅端点命名空间支持spring-doc.cadn.net.cn

消息注解上的 poller() 属性与 reactive() 属性互斥。 更多信息请参见下一节。spring-doc.cadn.net.cn

使用@Reactive注解

ReactiveStreamsConsumer自版本5.0以来就已存在,但仅当端点的输入通道是FluxMessageChannel(或任何org.reactivestreams.Publisher实现)时才应用。 从版本5.3开始,当目标消息处理器为ReactiveMessageHandler时,框架也会创建其实例,与输入通道类型无关。 @Reactive子注解(类似于上述提到的@Poller)自版本5.5起已引入所有消息注解中。 它接受一个可选的Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> Bean引用,并且无论输入通道类型和消息处理器如何,都将目标端点转换为ReactiveStreamsConsumer实例。 该函数由Flux.transform()操作符使用,以对来自输入通道的响应式流源应用某些自定义(publishOn()doOnNext()log()retry()等)。spring-doc.cadn.net.cn

以下示例演示了如何独立于最终订阅者和生产者,将输入通道的发布线程更改为 DirectChannelspring-doc.cadn.net.cn

@Bean
public Function<Flux<?>, Flux<?>> publishOnCustomizer() {
    return flux -> flux.publishOn(Schedulers.parallel());
}

@ServiceActivator(inputChannel = "directChannel", reactive = @Reactive("publishOnCustomizer"))
public void handleReactive(String payload) {
    ...
}

消息注解上的 reactive() 属性与 poller() 属性互斥。 有关更多信息,请参阅 使用 @Poller 注解响应式流支持spring-doc.cadn.net.cn

使用@InboundChannelAdapter注解

版本 4.0 引入了 @InboundChannelAdapter 方法级注解。 它基于 MethodInvokingMessageSource 为一个被注解的方法生成一个 SourcePollingChannelAdapter 集成组件。 该注解是 <int:inbound-channel-adapter> XML 组件的等价物,具有相同的限制:方法不能有参数,且返回类型不能为 void。 它有两个属性:value(必需的 MessageChannel Bean 名称)和 poller(可选的 @Poller 注解,如前文所述)。 如果您需要提供某些 MessageHeaders,请使用 Message<?> 返回类型并使用 MessageBuilder 来构建 Message<?>。 使用 MessageBuilder 允许您配置 MessageHeaders。 以下示例展示了如何使用 @InboundChannelAdapter 注解:spring-doc.cadn.net.cn

@InboundChannelAdapter("counterChannel")
public Integer count() {
    return this.counter.incrementAndGet();
}

@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixed-rate = "5000"))
public String foo() {
    return "foo";
}

版本 4.3 为 value 注解属性引入了 channel 别名,以提供更好的源代码可读性。 此外,目标 MessageChannel Bean 是在第一次 receive() 调用时通过提供的名称(由 outputChannelName 选项设置)在 SourcePollingChannelAdapter 中解析的,而不是在初始化阶段。这使得“延迟绑定”逻辑成为可能:从消费者角度来看,目标 MessageChannel Bean 的创建和注册比 @InboundChannelAdapter 解析阶段稍晚一些。spring-doc.cadn.net.cn

第一个示例要求默认轮询器已在应用程序上下文中其他地方声明。spring-doc.cadn.net.cn

使用 @MessagingGateway 注解spring-doc.cadn.net.cn

使用@IntegrationComponentScan注解

标准的 Spring Framework @ComponentScan 注解不会扫描具有 stereotype @Component 注解的接口。 为了克服这一限制并允许配置 @MessagingGateway(参见 @MessagingGateway 注解),我们引入了 @IntegrationComponentScan 机制。 此注解必须与 @Configuration 注解一起放置,并自定义以定义其扫描选项, 例如 basePackagesbasePackageClasses。 在这种情况下,所有带有 @MessagingGateway 注解的发现接口都会被解析并注册为 GatewayProxyFactoryBean 实例。 所有其他基于类的组件则由标准的 @ComponentScan 进行解析。spring-doc.cadn.net.cn