|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
Spring 集成框架概述
Spring Integration 扩展了 Spring 编程模型,以支持众所周知的 企业集成模式。 它支持基于 Spring 的应用程序内部的轻量级消息传递,并通过声明式适配器支持与外部系统的集成。 这些适配器在 Spring 对远程调用、消息传递和调度的支持之上提供了更高级别的抽象。
Spring Integration 的主要目标是提供一个简单的模型,用于构建企业级集成解决方案,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。
Spring 集成概述
本章提供了 Spring Integration 核心概念和组件的高级介绍。 其中包括一些编程技巧,帮助您充分利用 Spring Integration。
背景
Spring 框架的核心主题之一是控制反转(IoC)。 在广义上,这意味着框架代表其上下文中管理的组件处理职责。 组件本身因此得以简化,因为它们不再需要承担这些职责。 例如,依赖注入使组件无需负责查找或创建其依赖项。 同样,面向切面编程通过将通用横切关注点模块化到可重用的切面中,从而减轻业务组件的负担。 在每种情况下,最终结果都是构建出一个更易于测试、理解、维护和扩展的系统。
此外,Spring 框架及其产品组合为企业级应用的构建提供了全面的编程模型。 开发人员得益于该模型的一致性,尤其是其基于成熟的最佳实践,例如面向接口编程以及优先采用组合而非继承。 Spring 简化的抽象和强大的支持库在提升开发者生产效率的同时,也增强了系统的可测试性和可移植性。
Spring Integration 秉承相同的目标和原则。 它将 Spring 编程模型扩展至消息领域,并基于 Spring 现有的企业集成支持,提供更高层次的抽象。 它支持消息驱动架构,其中控制反转适用于运行时关注点,例如某些业务逻辑何时执行以及响应应发送至何处。 它支持消息的路由和转换,从而能够整合不同的传输方式和数据格式,且不影响可测试性。 换句话说,消息和集成相关的职责均由框架处理。 业务组件进一步与基础设施隔离,开发者得以从复杂的集成责任中解脱出来。
作为 Spring 编程模型的扩展,Spring Integration 提供了多种配置选项,包括注解、支持命名空间的 XML、使用通用“bean”元素的 XML,以及直接使用底层 API。 该 API 基于明确定义的策略接口和非侵入式的委托适配器。 Spring Integration 的设计灵感源于对 Spring 中常见模式与 Gregor Hohpe 和 Bobby Woolf(Addison Wesley, 2004)在企业集成模式一书中描述的知名模式之间存在强烈亲和力的认识。 阅读过该书的开发者将能立即理解 Spring Integration 的概念和术语。
目标与原则
Spring Integration 的动机源于以下目标:
-
提供用于实现复杂企业集成解决方案的简单模型。
-
在基于 Spring 的应用程序中实现异步、消息驱动的行为。
-
促进现有 Spring 用户直观、渐进式地采用。
Spring Integration 遵循以下原则:
-
组件应松散耦合,以实现模块化和可测试性。
-
该框架应强制实现业务逻辑与集成逻辑之间的关注点分离。
-
扩展点应在本质上是抽象的(但在明确定义的边界内),以促进重用和可移植性。
主要组件
从垂直视角来看,分层架构有助于关注点分离,而层与层之间基于接口的契约促进了松耦合。基于 Spring 的应用程序通常按此方式设计,Spring 框架及其产品组合为遵循企业应用全栈最佳实践提供了坚实基础。基于消息的架构增加了横向视角,但这些目标仍然相关。正如“分层架构”是一种极其通用和抽象的范式,消息系统通常遵循同样抽象的“管道 - 过滤器”模型。“过滤器”代表任何能够生成或消费消息的组件,而“管道”则在过滤器之间传输消息,从而使组件本身保持松散耦合。值得注意的是,这两个高层范式并非相互排斥。支持“管道”的底层消息基础设施仍应封装在一个层中,其契约定义为接口。同样地,“过滤器”本身应在逻辑上位于应用程序服务层之上的层级中进行管理,并通过接口与这些服务进行交互,其方式与 Web 层非常相似。
消息
在 Spring Integration 中,消息是任何 Java 对象的通用包装器,并结合了框架在处理该对象时使用的元数据。 它由负载和头部组成。 负载可以是任何类型,而头部则包含常用的信息,如 ID、时间戳、关联 ID 和返回地址。 头部还用于在连接的传输之间传递值。 例如,当从接收到的文件创建消息时,文件名可能存储在头部中,以便下游组件访问。 同样,如果消息的内容最终将由出站邮件适配器发送,则各种属性(收件人、发件人、抄送、主题等)可由上游组件配置为消息头部值。 开发人员还可以在头部存储任意键值对。
消息通道
消息通道代表“管道与过滤器”架构中的“管道”。 生产者将消息发送到通道,消费者从通道接收消息。 因此,消息通道解耦了消息组件,并为消息的拦截和监控提供了便利的切入点。
消息通道可以遵循点对点或发布 - 订阅语义。 在点对点通道中,发送到通道的每条消息最多只能被一个消费者接收。 而发布 - 订阅通道则尝试将每条消息广播给通道上的所有订阅者。 Spring Integration 支持这两种模型。
虽然“点对点”和“发布-订阅”定义了每条消息最终由多少个消费者接收的两种选项,但还有一个重要的考虑因素:通道是否应该缓冲消息? 在 Spring Integration 中,可轮询通道能够在队列中缓冲消息。 缓冲的优势在于它可以限制传入消息的速率,从而防止消费者过载。 然而,顾名思义,这也增加了一些复杂性,因为只有在配置了轮询器的情况下,消费者才能从此类通道接收消息。 另一方面,连接到可订阅通道的消费者完全是消息驱动的。 消息通道实现详细讨论了 Spring Integration 中可用的各种通道实现。
消息端点
Spring Integration 的主要目标之一是通过控制反转简化企业级集成解决方案的开发。这意味着您无需直接实现消费者和生产者,甚至无需构建消息或在消息通道上调用发送或接收操作。相反,您应该能够专注于您的特定领域模型,并基于纯对象进行实现。然后,通过提供声明式配置,您可以将特定于领域的代码与 Spring Integration 提供的消息基础设施“连接”起来。负责这些连接的组件是消息端点。这并不意味着您必须直接将现有应用程序代码连接起来。任何现实世界的企业集成解决方案都需要一定数量的代码,专注于路由和转换等集成问题。关键在于实现集成逻辑与业务逻辑之间的关注点分离。换句话说,与 Web 应用程序的模型 - 视图 - 控制器(MVC)范式一样,目标应该是提供一个薄但专用的层,将传入请求转换为服务层调用,然后将服务层的返回值转换为传出响应。下一节将概述负责处理这些职责的消息端点类型,在接下来的章节中,您将看到 Spring Integration 的声明式配置选项如何提供一种非侵入式的方式来使用每一种端点。
消息端点
消息端点代表“管道 - 过滤器”架构中的“过滤器”。正如前面提到的,端点的主要作用是将应用程序代码连接到消息框架,并以非侵入式的方式实现。换句话说,应用程序代码理想情况下不应感知消息对象或消息通道。这类似于 MVC 范式中的控制器角色。正如控制器处理 HTTP 请求一样,消息端点处理消息。正如控制器映射到 URL 模式一样,消息端点也映射到消息通道。在两种情况下,目标都是相同的:将应用程序代码与基础设施隔离。这些概念以及随后讨论的所有模式均在《企业集成模式》一书中进行了详细阐述。在此,我们仅提供 Spring Integration 支持的主要端点类型的高级描述以及与之相关的角色。接下来的章节将详细阐述并提供示例代码以及配置示例。
消息转换器
消息转换器负责转换消息的内容或结构,并返回修改后的消息。
最常见的转换器类型之一是将消息的负载从一种格式转换为另一种格式(例如从 XML 转换为 java.lang.String)。
同样,转换器可以添加、删除或修改消息的头部值。
消息过滤器
消息过滤器用于确定消息是否应被传递到输出通道。
这通常需要一个布尔测试方法,该方法可以检查特定的有效负载内容类型、属性值、标头的存在或其他条件。
如果消息被接受,它将被发送到输出通道。
如果没有被接受,它将被丢弃(或者,对于更严格的实现,可能会抛出 Exception)。
消息过滤器通常与发布 - 订阅通道结合使用,在这种通道中,多个消费者可能接收相同的消息,并利用过滤器的条件来缩小待处理消息的范围。
| 请小心不要将“管道 - 过滤器”架构模式中“过滤器”的通用用法,与这种特定的端点类型混淆,后者用于有选择地筛选在两个通道之间流动的消息。 “管道 - 过滤器”概念中的“过滤器”更贴近 Spring Integration 的消息端点:任何可以连接到消息通道以发送或接收消息的组件。 |
消息路由器
消息路由器负责决定下一个应将消息发送到哪个通道或哪些通道(如果有的话)。 通常,该决策基于消息的内容或消息头中可用的元数据。 消息路由器通常用作服务激活器或其他能够发送回复消息的端点上静态配置输出通道的动态替代方案。 同样,如前所述,消息路由器为多个订阅者使用的被动消息过滤器提供了一种主动的替代方案。
聚合器
聚合器基本上是分割器的镜像,它是一种消息端点,接收多条消息并将它们组合成一条单一消息。
实际上,聚合器通常是包含分割器的管道中的下游消费者。
从技术上讲,聚合器比分割器更复杂,因为它需要维护状态(待聚合的消息),决定何时整个消息组已就绪,并在必要时处理超时。
此外,在发生超时的情况下,聚合器需要知道是发送部分结果、丢弃它们,还是将它们发送到单独的通道。
Spring Integration 提供了 CorrelationStrategy、ReleaseStrategy 以及可配置的超时设置、超时时是否发送部分结果,以及一个丢弃通道。
服务激活器
服务激活器是一个通用端点,用于将服务实例连接到消息系统。 必须配置输入消息通道,并且如果要调用的服务方法能够返回值,则还可以提供输出消息通道。
| 输出通道是可选的,因为每条消息也可以提供自己的“回复地址”头。 此规则同样适用于所有消费者端点。 |
服务激活器调用某个服务对象上的操作来处理请求消息,提取请求消息的负载,并在必要时进行转换(如果该方法不期望接收消息类型的参数)。 每当服务对象的方法返回一个值时,如果需要,该返回值也会被转换为回复消息(如果它本身还不是消息类型)。 该回复消息被发送到输出通道。 如果未配置输出通道,则回复消息将发送到消息中指定的“返回地址”所对应的通道(如果可用)。
请求 - 回复服务激活器端点将目标对象的方法连接到输入和输出消息通道。
| 如前所述,在 消息通道 中,通道可以是可轮询的或可订阅的。 在前面的图中,这通过“时钟”符号以及实线箭头(轮询)和虚线箭头(订阅)来表示。 |
通道适配器
通道适配器是一种端点,用于将消息通道连接到其他系统或传输协议。 通道适配器可以是入站或出站的。 通常,通道适配器会在消息与从其他系统接收或向其发送的任何对象或资源(如文件、HTTP 请求、JMS 消息等)之间进行某种映射。 根据传输协议的不同,通道适配器还可能填充或提取消息头值。 Spring Integration 提供了多种通道适配器,将在后续章节中进行描述。
MessageChannel。| 消息源可以是可轮询的(例如 POP3),也可以是消息驱动的(例如 IMAP Idle)。 在前面的图表中,这通过“时钟”符号以及实线箭头(轮询)和虚线箭头(消息驱动)来描绘。 |
MessageChannel 连接到目标系统。| 正如之前在消息通道中讨论的,通道可以是可轮询的或可订阅的。 在前面的图中,这通过“时钟”符号以及实线箭头(轮询)和虚线箭头(订阅)来表示。 |
端点 Bean 名称
消费端点(任何带有 inputChannel 的端点)由两个 Bean 组成:消费者和消息处理器。
消费者持有对消息处理器的引用,并在消息到达时调用它。
考虑以下 XML 示例:
<int:service-activator id = "someService" ... />
鉴于上述示例,bean 的名称如下:
-
消费者:
someService(即id) -
处理器:
someService.handler
当使用企业集成模式(EIP)注解时,名称取决于多个因素。 考虑以下带注解的 POJO 示例:
@Component
public class SomeComponent {
@ServiceActivator(inputChannel = ...)
public String someMethod(...) {
...
}
}
鉴于上述示例,bean 的名称如下:
-
消费者:
someComponent.someMethod.serviceActivator -
处理器:
someComponent.someMethod.serviceActivator.handler
从版本 5.0.4 开始,您可以使用 @EndpointId 注解来修改这些名称,如下例所示:
@Component
public class SomeComponent {
@EndpointId("someService")
@ServiceActivator(inputChannel = ...)
public String someMethod(...) {
...
}
}
鉴于上述示例,bean 的名称如下:
-
消费者:
someService -
处理器:
someService.handler
@EndpointId 创建的名称与使用 XML 配置的 id 属性创建的名称相同。
请考虑以下带注解的 Bean 示例:
@Configuration
public class SomeConfiguration {
@Bean
@ServiceActivator(inputChannel = ...)
public MessageHandler someHandler() {
...
}
}
鉴于上述示例,bean 的名称如下:
-
消费者:
someConfiguration.someHandler.serviceActivator -
处理器:
someHandler(即@Bean名称)
从版本 5.0.4 开始,您可以使用 @EndpointId 注解来修改这些名称,如下例所示:
@Configuration
public class SomeConfiguration {
@Bean("someService.handler") (1)
@EndpointId("someService") (2)
@ServiceActivator(inputChannel = ...)
public MessageHandler someHandler() {
...
}
}
| 1 | 处理器:someService.handler(Bean 名称) |
| 2 | 消费者:someService(端点 ID) |
@EndpointId 注解创建的名称与 XML 配置中 id 属性创建的名称一致,前提是遵循将 .handler 追加到 @Bean 名称的约定。
存在一种特殊情况,会创建第三个 Bean:出于架构原因,如果 MessageHandler @Bean 未定义 AbstractReplyProducingMessageHandler,框架会将提供的 Bean 包装在 ReplyProducingMessageHandlerWrapper 中。
该包装器支持请求处理器通知处理,并输出正常的“未产生回复”调试日志消息。
其 Bean 名称为处理器 Bean 名称加上 .wrapper(当存在 @EndpointId 时;否则,使用正常生成的处理器名称)。
同样地,可轮询消息源会创建两个 Bean:一个 SourcePollingChannelAdapter(SPCA)和一个 MessageSource。
考虑以下 XML 配置:
<int:inbound-channel-adapter id = "someAdapter" ... />
鉴于上述 XML 配置,Bean 的名称如下:
-
SPCA:
someAdapter(即id) -
处理器:
someAdapter.source
考虑以下用于定义@EndpointId的 POJO 的 Java 配置:
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public String pojoSource() {
...
}
鉴于上述 Java 配置示例,Bean 名称如下:
-
SPCA:
someAdapter -
处理器:
someAdapter.source
考虑以下用于定义 @EndpointID 的 Java Bean 配置:
@Bean("someAdapter.source")
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public MessageSource<?> source() {
return () -> {
...
};
}
鉴于上述示例,bean 的名称如下:
-
SPCA:
someAdapter -
处理器:
someAdapter.source(只要您遵循在@Bean名称后追加.source的约定)
配置和@EnableIntegration
在本文档中,您可以看到关于在 Spring Integration 流程中声明元素的 XML 命名空间支持的引用。
该支持由一系列命名空间解析器提供,这些解析器生成适当的 bean 定义以实现特定组件。
例如,许多端点由一个 MessageHandler bean 和一个 ConsumerEndpointFactoryBean 组成,其中注入了处理器和输入通道名称。
首次遇到 Spring Integration 命名空间元素时,框架会自动声明多个 Bean(包括任务调度器、隐式通道创建器等),以支持运行时环境。
版本 4.0 引入了 @EnableIntegration 注解,用于注册 Spring Integration 基础设施 Bean(参见 Javadoc)。
当仅使用 Java 配置时(例如与 Spring Boot 或 Spring Integration 消息注解支持以及不带 XML 集成配置的 Spring Integration Java DSL 配合使用时),必须使用该注解。 |
@EnableIntegration 注解在以下场景中也非常有用:当存在一个不包含 Spring Integration 组件的父上下文,以及两个或多个使用 Spring Integration 的子上下文时。
它允许这些通用组件仅在父上下文中声明一次。
@Component 注解向应用程序上下文注册了许多基础设施组件。
特别是,它:
-
注册一些内置 Bean,例如
errorChannel及其LoggingHandler、用于轮询器的taskScheduler、jsonPathSpEL 函数以及其他内容。 -
添加多个
BeanFactoryPostProcessor实例以增强全局和默认集成环境的BeanFactory。 -
添加几个
BeanPostProcessor实例,以增强或转换并封装特定 Bean,用于集成目的。 -
添加注解处理器以解析消息注解,并将相关组件注册到应用程序上下文中。
@IntegrationComponentScan 注解也允许进行类路径扫描。
该注解的作用与标准 Spring Framework 的 @ComponentScan 注解类似,但它仅限于特定于 Spring Integration 的组件和注解,而这些是标准 Spring Framework 组件扫描机制无法触及的。
有关示例,请参见 @MessagingGateway 注解。
@EnablePublisher 注解用于注册一个 PublisherAnnotationBeanPostProcessor Bean,并为那些未提供 channel 属性的 @Publisher 注解配置 default-publisher-channel。
如果找到多个 @EnablePublisher 注解,则它们必须具有相同的默认通道值。
有关更多信息,请参阅 使用 @Publisher 注解的注解驱动配置。
已引入 @GlobalChannelInterceptor 注解,用于标记需要全局通道拦截的 ChannelInterceptor Bean。
该注解是 <int:channel-interceptor> XML 元素的对应物(参见 全局通道拦截器配置)。
@GlobalChannelInterceptor 注解可以放置在类级别(配合 @Component 构造型注解),也可以放置在 @Configuration 类中的 @Bean 方法上。
无论哪种情况,Bean 都必须实现 ChannelInterceptor。
从版本 5.1 开始,全局通道拦截器适用于动态注册的通道——例如使用 beanFactory.initializeBean() 初始化的 Bean,或通过 Java DSL 在使用 IntegrationFlowContext 时初始化的 Bean。
此前,如果在应用上下文刷新后创建 Bean,拦截器将不会被应用。
@IntegrationConverter 注解将 Converter、GenericConverter 或 ConverterFactory Bean 标记为 integrationConversionService 的候选转换器。
该注解是 <int:converter> XML 元素的等效形式(参见 有效负载类型转换)。
您可以在类级别(配合 @Component 原型注解)或将 @IntegrationConverter 注解放置于 @Configuration 类中的 @Bean 方法上。
有关消息注解的更多信息,请参阅注解支持。
编程注意事项
Spring Integration 中的大多数类(除非另有说明)必须在应用程序上下文中声明为 Bean,并且是单例的。
这意味着这些类的实例是线程安全的,它们的生命周期以及与其他组件的连接由 Spring 依赖注入容器管理。
工具类和构建器类(JacksonJsonUtils, `MessageBuilder、ExpressionEvalMap、IntegrationReactiveUtils等)可以直接在 Java 代码中使用。
然而,Java DSL 工厂和IntegrationComponentSpec实现结果仍然需要注册为应用程序上下文中的 Bean。
Session抽象在许多模块中都存在,它不是线程安全的,通常由Factory模式实现创建,并从线程安全的Template模式中使用。
例如,请参见SftpRemoteFileTemplate及其与DefaultSftpSessionFactory的关系。
只要可能,您就应该使用普通 Java 对象(POJO)(用于目标逻辑中的消息处理),并且仅在绝对必要时才在代码中暴露框架。 请参阅 POJO 方法调用 以获取更多信息。
如果您将框架暴露给您的类,则有一些需要考虑的事项,特别是在应用程序启动期间:
-
如果您的组件是
ApplicationContextAware,通常不应在setApplicationContext()方法中使用ApplicationContext。 相反,应存储引用,并将此类使用推迟到上下文生命周期的后期。 -
如果您的组件是
InitializingBean或使用了@PostConstruct方法,请不要从这些初始化方法发送任何消息。 调用这些方法时,应用上下文尚未初始化,发送此类消息很可能失败。 如果您需要在启动期间发送消息,请实现ApplicationListener并等待ContextRefreshedEvent。 或者,实现SmartLifecycle,将您的 Bean 置于后期阶段,并从start()方法发送消息。
使用打包(例如,阴影)JAR 时的注意事项
Spring Integration 利用 Spring Framework 的 SpringFactories 机制来加载多个 IntegrationConfigurationInitializer 类,从而启动某些功能。
这包括 -core jar 包以及其他一些 jar 包,例如 -http 和 -jmx。
该过程的相关信息存储在每个 jar 包中的 META-INF/spring.factories 文件里。
一些开发者倾向于使用众所周知的工具,例如 Apache Maven Shade Plugin,将他们的应用程序和所有依赖项重新打包成一个单一的 jar 文件。
默认情况下,shade 插件在生成 shaded jar 时不会合并 spring.factories 文件。
除了 spring.factories 之外,其他 META-INF 文件(spring.handlers 和 spring.schemas)也用于 XML 配置。
这些文件也需要进行合并。
Spring Boot 的可执行 jar 机制采用了一种不同的方法,它将 jar 文件嵌套,从而在类路径上保留每个 spring.factories 文件。
因此,对于 Spring Boot 应用程序,如果使用其默认的可执行 jar 格式,则无需其他操作。 |
即使您不使用 Spring Boot,仍然可以使用 Boot 提供的工具来增强 shade 插件,通过为上述文件添加转换器。 以下示例展示了如何配置该插件:
...
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<dependencies>
<dependency> (1)
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers> (2)
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
...
具体来说,
| 1 | 添加 spring-boot-maven-plugin 作为依赖项。 |
| 2 | 配置转换器。 |
您可以为 ${spring.boot.version} 添加一个属性,或使用显式版本。
编程技巧与窍门
本节记录了一些充分利用 Spring Integration 的方法。
XML Schema
使用 XML 配置时,为避免出现错误的架构验证错误,您应使用“感知 Spring”的 IDE,例如 Spring Tool Suite (STS)、带有 Spring IDE 插件的 Eclipse 或 IntelliJ IDEA。
这些 IDE 知道如何从类路径中解析正确的 XML 架构(通过使用 jars 中的 META-INF/spring.schemas 文件)。
当使用 STS 或带有插件的 Eclipse 时,您必须在项目上启用 Spring Project Nature。
互联网上托管的某些遗留模块(即在 1.0 版本中存在的模块)的模式出于兼容性原因仍为 1.0 版本。 如果您的 IDE 使用这些模式,则可能会出现误报错误。
每个这些在线架构都有类似于以下的警告:
|
此架构适用于 Spring Integration Core 的 1.0 版本。 我们无法将其更新为当前架构,因为这会破坏任何使用 1.0.3 或更低版本的应用程序。 对于后续版本,“无版本”架构将从类路径中解析并从 jar 文件中获取。 请参阅 GitHub: |
受影响的模块包括
-
core(spring-integration.xsd) -
file -
http -
jms -
mail -
security -
stream -
ws -
xml
查找 Java 和 DSL 配置的类名
通过 XML 配置和 Spring Integration 命名空间支持,XML 解析器隐藏了目标 Bean 的声明和连接方式。 对于 Java 配置,了解面向最终用户应用程序的框架 API 至关重要。
EIP 实现的一等公民是 Message、Channel 和 Endpoint(参见本章前面的 主要组件)。
它们的实现(合约)如下:
前两个非常简单,易于理解如何实现、配置和使用。 最后一个值得更多关注
AbstractEndpoint 在 Spring Framework 中被广泛用于不同的组件实现。
其主要实现包括:
-
EventDrivenConsumer,用于我们订阅SubscribableChannel以监听消息时。 -
PollingConsumer,用于当我们从PollableChannel轮询消息时。
当您使用消息注解或 Java DSL 时,无需担心这些组件,因为框架会自动生成它们,并附带适当的注解和 BeanPostProcessor 实现。
在手动构建组件时,应使用 ConsumerEndpointFactoryBean 来根据提供的 inputChannel 属性确定要创建的 target AbstractEndpoint 消费者实现。
另一方面,ConsumerEndpointFactoryBean 委托给框架中的另一个一等公民 - org.springframework.messaging.MessageHandler。
该接口的实现目标是处理由端点从通道消费的消息。
Spring Integration 中的所有 EIP 组件都是 MessageHandler 的实现(例如,AggregatingMessageHandler、MessageTransformingHandler、AbstractMessageSplitter 等)。
目标协议出站适配器(FileWritingMessageHandler、HttpRequestExecutingMessageHandler、AbstractMqttMessageHandler 等)也是 MessageHandler 的实现。
当您使用 Java 配置开发 Spring Integration 应用程序时,应查看 Spring Integration 模块以找到合适的 MessageHandler 实现,用于 @ServiceActivator 配置。
例如,要发送 XMPP 消息(参见 XMPP 支持),您应该配置如下内容:
@Bean
@ServiceActivator(inputChannel = "input")
public MessageHandler sendChatMessageHandler(XMPPConnection xmppConnection) {
ChatMessageSendingMessageHandler handler = new ChatMessageSendingMessageHandler(xmppConnection);
DefaultXmppHeaderMapper xmppHeaderMapper = new DefaultXmppHeaderMapper();
xmppHeaderMapper.setRequestHeaderNames("*");
handler.setHeaderMapper(xmppHeaderMapper);
return handler;
}
MessageHandler 的实现代表消息流的出站和处理部分。
入站消息流侧拥有其专属组件,这些组件分为轮询和监听行为。
监听(消息驱动)组件较为简单,通常只需实现一个目标类即可准备就绪以生成消息。
监听组件可以是一向的 MessageProducerSupport 实现(例如 AbstractMqttMessageDrivenChannelAdapter 和 ImapIdleChannelAdapter),也可以是请求 - 回复式的 MessagingGatewaySupport 实现(例如 AmqpInboundGateway 和 AbstractWebServiceInboundGateway)。
轮询入站端点适用于那些不提供监听器 API 或不打算采用此类行为的协议,包括任何基于文件的协议(如 FTP)、任何数据库(关系型数据库或 NoSQL)以及其他协议。
这些入站端点由两个组件组成:轮询器配置,用于定期启动轮询任务,
以及一个消息源类,用于从目标协议读取数据并为下游集成流生成消息。
第一个用于轮询配置的类是 SourcePollingChannelAdapter。
它是 AbstractEndpoint 的一个实现,但专门用于启动集成流的轮询。
通常,在使用消息注解或 Java DSL 时,您无需担心此类。
框架会根据 @InboundChannelAdapter 配置或 Java DSL 构建器规范为其生成一个 Bean。
消息源组件对于目标应用程序的开发更为重要,它们都实现了 MessageSource 接口(例如 MongoDbMessageSource 和 AbstractTwitterMessageSource)。
考虑到这一点,我们使用 JDBC 从 RDBMS 表中读取数据的配置可能如下所示:
@Bean
@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixedDelay="5000"))
public MessageSource<?> storedProc(DataSource dataSource) {
return new JdbcPollingChannelAdapter(dataSource, "SELECT * FROM foo where status = 0");
}
您可以在特定的 Spring Integration 模块中找到目标协议所需的所有入站和出站类(在大多数情况下,位于相应的包中)。
例如,spring-integration-websocket 适配器是:
-
o.s.i.websocket.inbound.WebSocketInboundChannelAdapter: 实现MessageProducerSupport以监听套接字上的帧并向通道生成消息。 -
o.s.i.websocket.outbound.WebSocketOutboundMessageHandler: 将传入消息转换为适当帧并通过 WebSocket 发送的单向AbstractMessageHandler实现。
如果您熟悉 Spring Integration XML 配置,从版本 4.3 开始,我们在 XSD 元素定义中提供了关于用于声明适配器或网关 Bean 的目标类的信息,如下示例所示:
<xsd:element name="outbound-async-gateway">
<xsd:annotation>
<xsd:documentation>
Configures a Consumer Endpoint for the 'o.s.i.amqp.outbound.AsyncAmqpOutboundGateway'
that will publish an AMQP Message to the provided Exchange and expect a reply Message.
The sending thread returns immediately; the reply is sent asynchronously; uses 'AsyncRabbitTemplate.sendAndReceive()'.
</xsd:documentation>
</xsd:annotation>
POJO 方法调用
正如在编程注意事项中所讨论的,我们建议采用 POJO 编程风格,如下示例所示:
@ServiceActivator
public String myService(String payload) { ... }
在这种情况下,框架会提取一个 String 负载,调用您的方法,并将结果封装为消息发送到流程中的下一个组件(原始标头会被复制到新消息中)。
实际上,如果您使用 XML 配置,甚至不需要 @ServiceActivator 注解,如下面的配对示例所示:
<int:service-activator ... ref="myPojo" method="myService" />
public String myService(String payload) { ... }
只要类中的公共方法没有歧义,您就可以省略 method 属性。
您也可以在 POJO 方法中获取标头信息,如下示例所示:
@ServiceActivator
public String myService(@Payload String payload, @Header("foo") String fooHeader) { ... }
您也可以像以下示例所示,对消息上的属性进行解引用:
@ServiceActivator
public String myService(@Payload("payload.foo") String foo, @Header("bar.baz") String barbaz) { ... }
由于各种 POJO 方法调用是可用的,5.0 之前的版本使用 SpEL(Spring 表达式语言)来调用 POJO 方法。
与这些操作中通常实际执行的工作相比,SpEL(即使是解释执行的)通常“足够快”。
然而,从 5.0 版本开始,在可能的情况下默认使用 org.springframework.messaging.handler.invocation.InvocableHandlerMethod。
这种技术的执行速度通常比解释型 SpEL 更快,并且与其他 Spring 消息传递项目保持一致。
InvocableHandlerMethod 类似于 Spring MVC 中用于调用控制器方法的技术。
在使用 SpEL 时,某些方法仍然始终被调用。
示例包括带有解引用属性的注解参数,如前所述。
这是因为 SpEL 具有导航属性路径的能力。
可能存在一些我们尚未考虑到的其他极端情况,这些情况在 InvocableHandlerMethod 个实例下也无法正常工作。
因此,在这些情况下,我们会自动回退到使用 SpEL。
如果您愿意,也可以使用 @Value 注解配置您的 POJO 方法,使其始终使用 SpEL,如下示例所示:
@UseSpelInvoker(compilerMode = "IMMEDIATE")
public void bar(String bar) { ... }
如果省略了 compilerMode 属性,则 spring.expression.compiler.mode 系统属性将决定编译器模式。
有关编译后 SpEL 的更多信息,请参阅 SpEL 编译。