|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
使用协议适配器
到目前为止,所有示例都说明了 DSL 如何通过使用 Spring Integration 编程模型来支持消息架构。 然而,我们尚未进行任何真正的集成。 实现这一点需要能够通过网络协议(如 HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP 等)访问远程资源,或访问本地文件系统。 Spring Integration 支持所有这些功能以及更多功能。 理想情况下,DSL 应为所有这些功能提供一等公民级别的支持,但实现所有这些功能并随着新适配器添加到 Spring Integration 中而保持同步是一项艰巨的任务。 因此,预期是 DSL 将不断追赶 Spring Integration 的更新。
因此,我们提供高级 API 以无缝地定义特定于协议的 Messaging。
我们通过工厂模式和构建者模式以及 Lambda 表达式来实现这一点。
您可以将工厂类视为“命名空间工厂”,因为它们所扮演的角色与来自具体协议特定 Spring Integration 模块的组件所使用的 XML 命名空间相同。
目前,Spring Integration Java DSL 支持 Amqp、Feed、Jms、Files、(S)Ftp、Http、JPA、MongoDb、TCP/UDP、Mail、WebFlux 和 Scripts 命名空间工厂。
以下示例展示了如何使用其中三个(Amqp、Jms 和 Mail):
@Bean
public IntegrationFlow amqpFlow() {
return IntegrationFlow.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
.transform("hello "::concat)
.transform(String.class, String::toUpperCase)
.get();
}
@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
return IntegrationFlow.from("jmsOutboundGatewayChannel")
.handle(Jms.outboundGateway(this.jmsConnectionFactory)
.replyContainer(c ->
c.concurrentConsumers(3)
.sessionTransacted(true))
.requestDestination("jmsPipelineTest"))
.get();
}
@Bean
public IntegrationFlow sendMailFlow() {
return IntegrationFlow.from("sendMailChannel")
.handle(Mail.outboundAdapter("localhost")
.port(smtpPort)
.credentials("user", "pw")
.protocol("smtp")
.javaMailProperties(p -> p.put("mail.debug", "true")),
e -> e.id("sendMailEndpoint"))
.get();
}
上述示例展示了如何将“命名空间工厂”用作内联适配器声明。
然而,您可以从 @Bean 定义中使用它们,以使 IntegrationFlow 方法链更具可读性。
| 我们正在征求社区对这些命名空间工厂的反馈,以便在投入其他工作之前收集意见。 我们也非常欢迎任何关于接下来应优先支持哪些适配器和网关的建议。 |
您可以在本参考手册中各协议特定章节找到更多 Java DSL 示例。
所有其他协议通道适配器都可以配置为通用 Bean,并像以下示例所示连接到 IntegrationFlow:
@Bean
public QueueChannelSpec wrongMessagesChannel() {
return MessageChannels
.queue()
.wireTap("wrongMessagesWireTapChannel");
}
@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
return IntegrationFlow.from("inputChannel")
.filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
e -> e.discardChannel(wrongMessagesChannel))
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(xpathRouter(wrongMessagesChannel))
.get();
}
@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
XPathRouter router = new XPathRouter("local-name(/*)");
router.setEvaluateAsString(true);
router.setResolutionRequired(false);
router.setDefaultOutputChannel(wrongMessagesChannel);
router.setChannelMapping("Tags", "splittingChannel");
router.setChannelMapping("Tag", "receivedChannel");
return router;
}