Java DSL
Java DSL
Spring Integration Java 配置和 DSL 提供了一套便捷的构建器和流畅的 API,允许你从 Spring 配置 Spring Integration 消息流@Configuration类。
(另见Kotlin DSL。)
(另见Groovy DSL。)
Spring 集成的 Java DSL 本质上是 Spring 集成的一个表象。
DSL 提供了一种简单的方法,通过使用 fluent 将 Spring 集成消息流嵌入您的应用程序中架构工人模式与 Spring Framework 和 Spring Integration 中的现有 Java 配置一起使用。
我们还使用并支持 lambdas(Java 8 提供),以进一步简化 Java 配置。
这家咖啡馆是DSL使用的绝佳范例。
DSL由集成流程流流 API(参见IntegrationFlowBuilder).
这会产生集成流程component,应被注册为Spring bean(通过使用@Bean注释)。
构建者模式用于将任意复杂的结构表达为可接受λ参数的方法层级结构。
这IntegrationFlowBuilder仅收集积分分量(消息频道实例摘要终点实例,依此类推)在集成流程豆子用于在应用环境中对混凝土豆进行进一步解析和注册,由集成FlowBeanPostProcessor.
Java DSL 直接使用 Spring 集成类,绕过任何 XML 生成和解析。 然而,DSL提供的不仅仅是在XML上的语法糖。 其最吸引人的特点之一是能够定义内联lambda以实现端点逻辑,从而消除了实现自定义逻辑时需要外部类的需求。 在某种意义上,Spring Integration对Spring表达式语言(SpEL)和内联脚本的支持解决了这个问题,但lambda更简单且更强大。
以下示例展示了如何使用 Java 配置进行 Spring 集成:
@Configuration
@EnableIntegration
public class MyConfiguration {
@Bean
public AtomicInteger integerSource() {
return new AtomicInteger();
}
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.fromSupplier(integerSource()::getAndIncrement,
c -> c.poller(Pollers.fixedRate(100)))
.channel("inputChannel")
.filter((Integer p) -> p > 0)
.transform(Object::toString)
.channel(MessageChannels.queue())
.get();
}
}
上述配置示例的结果是,它在 之后创造应用上下文启动、Spring Integration 端点和消息通道。
Java 配置既可用于替代也可用于增强 XML 配置。
你不必替换所有现有的XML配置才能使用Java配置。
DSL 基础知识
这org.springframework.integration.dsl包包含IntegrationFlowBuilder前面提到的API,以及一些IntegrationComponentSpec实现,同时也是构建者,并提供流畅的API来配置具体端点。
这IntegrationFlowBuilder基础设施为基于消息的应用(如通道、端点、轮询器和通道拦截器)提供了通用的企业集成模式(EIP)。
端点在DSL中以动词表示以提高可读性。 以下列表包括常见的DSL方法名称及相关的EIP端点:
-
变革→
转换器 -
滤→
Filter -
处理→
服务激活器 -
分裂→
分配器 -
总计 →
聚合 -
路线→
路由器 -
桥→
桥
从概念上讲,集成过程是通过将这些端点组合成一个或多个消息流来构建的。
注意,EIP并未正式定义“消息流”一词,但将其视为使用已知消息模式的工作单元是有用的。
DSL提供了集成流程组件 来定义通道及其间端点的组合,但现在集成流程仅在应用上下文中扮演配置角色,填充真实豆子,运行时不使用。
然而,豆子的集成流程可以自动接线为生命周期控制开始()和停止()对整个流程,该流程委托给所有与此关联的春季集成组件集成流程.
以下示例使用了集成流程流畅API用于定义集成流程通过使用 EIP 方法进行豆子IntegrationFlowBuilder:
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlow.from("input")
.<String, Integer>transform(Integer::parseInt)
.get();
}
这变换方法接受 lambda 作为端点参数来作消息有效载荷。
该方法的实参数为通用变换器变换器<S,T型变换器>实例。
因此,任何提供的变换器(ObjectToJsonTransformer,FileToStringTransformer, 以及其他)可以在这里使用。
被窝里,IntegrationFlowBuilder识别消息处理器以及它的端点,令消息变换处理程序和ConsumerEndpointFactoryBean分别。
再举一个例子:
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.from("input")
.filter("World"::equals)
.transform("Hello "::concat)
.handle(System.out::println)
.get();
}
前述例子由转换器→服务激活器→Filter.
流程是“单向”的。
也就是说,它不提供回复消息,只打印有效载荷到 STDOUT。
端点通过直接通道自动布线。
|
Lambda 和
留言<?>参数在EIP方法中使用λ时,“输入”参数通常是消息有效载荷。
如果你想访问整个消息,可以使用那些过载的方法之一,该方法对
在运行时,当 相反,请使用:
|
|
豆子定义覆盖
Java DSL 可以注册 Flow 定义中内联定义对象的 beans,并且可以复用已注入的 beans。
如果内联对象定义相同的豆名和现有的豆定义,则 |
消息频道
此外IntegrationFlowBuilder通过EIP方法,Java DSL提供了一个流畅的API来配置消息频道实例。
为此消息频道提供建造工厂。
以下示例展示了如何使用它:
@Bean
public MessageChannel priorityChannel() {
return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
.interceptor(wireTap())
.get();
}
一样消息频道建造工厂可以用于通道()EIP方法IntegrationFlowBuilder连接到端点,类似于布线输入通道/输出通道在XML配置中。
默认情况下,端点有线直达频道豆名基于以下模式的例子:[IntegrationFlow.beanName].channel#[channelNameIndex].
该规则同样适用于由内联生成的无名信道消息频道架构工厂的使用情况。
然而,所有消息频道方法存在一种变体,能够识别channelId(频道识别)你可以用它来设置豆子名称消息频道实例。
这消息频道参考文献及豆名可以用作豆子方法调用。
以下示例展示了使用该条件的可能方法通道()EIP方法:
@Bean
public MessageChannel queueChannel() {
return MessageChannels.queue().get();
}
@Bean
public MessageChannel publishSubscribe() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public IntegrationFlow channelFlow() {
return IntegrationFlow.from("input")
.fixedSubscriberChannel()
.channel("queueChannel")
.channel(publishSubscribe())
.channel(MessageChannels.executor("executorChannel", this.taskExecutor))
.channel("output")
.get();
}
-
来自(“输入”)意思是“'找到并使用消息频道带有“输入”ID,或者创建一个“。 -
固定订阅者频道()产生一个实例固定订阅频道并以channelFlow.channel#0. -
channel(“queueChannel”)工作原理相同,但使用已有的queueChannel豆。 -
频道(publishSubscribe())是豆子法的参考。 -
channel(MessageChannels.executor(“executorChannel”, this.taskExecutor))是IntegrationFlowBuilder这暴露了IntegrationComponentSpec前往执行者频道并将其注册为执行者通道. -
通道(“输出”寄存器直达频道豆子输出只要没有带有该名称的豆子存在,就算是它的名字。
注:前述内容集成流程定义有效,且其所有通道都应用于满足桥接处理者实例。
注意使用相同的内联通道定义消息频道工厂来自不同集成流程实例。
即使DSL解析器将不存在的对象注册为豆子,也无法判定相同的对象(消息频道)来自不同的集成流程器皿。
以下例子是错误的: |
@Bean
public IntegrationFlow startFlow() {
return IntegrationFlow.from("input")
.transform(...)
.channel(MessageChannels.queue("queueChannel"))
.get();
}
@Bean
public IntegrationFlow endFlow() {
return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
.handle(...)
.get();
}
这个糟糕例子的结果是以下例外:
Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
there is already object [queueChannel] bound
at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)
要让它生效,你需要申报@Bean对于该通道,并使用其Beans方法,来自不同的集成流程实例。
民调员
Spring Integration还提供了一个流畅的API,允许你配置Poller元数据为摘要投票终点实现。
你可以使用民调员构建工厂用于配置通用豆定义或由IntegrationFlowBuilderEIP方法,如下示例所示:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedRate(500)
.errorChannel("myErrors");
}
看民调员和PollerSpec更多信息请参见 Javadoc。
如果你用DSL构造一个PollerSpec作为@Bean,不要调用get()豆子定义中的方法。
这PollerSpec是工厂豆这会产生Poller元数据规范中的对象,并初始化了其所有属性。 |
这响应式()端点
从5.5版本开始,消费者端点规格提供响应式()带有可选自定义器的配置属性功能<?超级Flux<Message<?>>,?扩展出版商<信息<?>>>.
该选项将目标端点配置为ReactiveStreamsConsumer实例,独立于输入通道类型,该通道类型被转换为通量通过IntegrationReactiveUtils.messageChannelToFlux().
所提供的函数来自Flux.transform()操作员自定义(publishOn(),log(),doOnNext()等等)输入通道的无功流源。
以下示例演示了如何独立于最终订阅者和生产者将发布线程从输入通道切换到直达频道:
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.<String, Integer>transform(Integer::parseInt,
e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())))
.get();
}
更多信息请参见响应式流支持。
DSL 与终端配置
都IntegrationFlowBuilderEIP方法有一种变体,应用λ参数来提供摘要终点实例:SmartLifecycle,Poller元数据,请求-处理-建议链,以及其他。
它们各自都有通用参数,所以它允许你配置端点,甚至它的消息处理器在上下文中,如下例子所示:
@Bean
public IntegrationFlow flow2() {
return IntegrationFlow.from(this.inputChannel)
.transform(new PayloadSerializingTransformer(),
c -> c.autoStartup(false).id("payloadSerializingTransformer"))
.transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
.get();
}
此外,端点规范提供id()方法让你注册一个端点豆,而不是生成的豆名。
如果消息处理器以豆子(ben)为参照,则任意存在建议链如果.advice()方法存在于DSL定义中:
@Bean
public TcpOutboundGateway tcpOut() {
TcpOutboundGateway gateway = new TcpOutboundGateway();
gateway.setConnectionFactory(cf());
gateway.setAdviceChain(Collections.singletonList(fooAdvice()));
return gateway;
}
@Bean
public IntegrationFlow clientTcpFlow() {
return f -> f
.handle(tcpOut(), e -> e.advice(testAdvice()))
.transform(Transformers.objectToString());
}
它们没有合并,只是测试建议()这次用的是豆子。
变形金刚
DSL API 提供了便捷且流畅的变形金刚工厂作为内联目标对象定义,在.transform()EIP方法。
以下示例展示了如何使用它:
@Bean
public IntegrationFlow transformFlow() {
return IntegrationFlow.from("input")
.transform(Transformers.fromJson(MyPojo.class))
.transform(Transformers.serializer())
.get();
}
它避免了使用设置器进行不方便的编码,并使流程定义更加简单。
注意你可以使用变形金刚宣告目标转换器实例@Bean实例,并且再次使用它们来自集成流程定义为Beans方法。
尽管如此,DSL解析器会处理内联对象的豆声明(如果它们尚未被定义为豆子)。
更多信息及支持的工厂方法,请参见 Javadoc 中的 Transformer。
入站通道适配器
通常,消息流从入站通道适配器开始(例如<int-jdbc:入站通道适配器>).
适配器配置为<poller>,并且它会问一个MessageSource<?>定期发送消息。
Java DSL 允许启动集成流程来自MessageSource<?>太。
为此,集成流程流利的API提供了超载IntegrationFlow.from(MessageSource<?> messageSource)方法。
你可以配置MessageSource<?>作为一个豆子,并以此作为该方法的论据。
第二个参数IntegrationFlow.from()是Consumer<SourcePollingChannelAdapterSpec>lambda 允许你提供选项(例如Poller元数据或SmartLifecycle)SourcePollingChannelAdapter.
以下示例展示了如何使用流流API和λ来创建集成流程:
@Bean
public MessageSource<Object> jdbcMessageSource() {
return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}
@Bean
public IntegrationFlow pollingFlow() {
return IntegrationFlow.from(jdbcMessageSource(),
c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
.transform(Transformers.toJson())
.channel("furtherProcessChannel")
.get();
}
对于那些没有建造需求的情况消息直接使用对象,你可以使用IntegrationFlow.fromSupplier()基于java.util.function.Supplier.
结果Supplier.get()自动被包裹在消息(如果它还不是消息).
消息路由器
Spring Integration 原生提供专用的路由器类型,包括:
-
头值路由器 -
有效载荷类型路由器 -
ExceptionTypeRouter -
收件人列表路由器 -
XPathRouter
与许多其他DSL一样IntegrationFlowBuilderEIP方法,路线()方法可以应用任意方法摘要消息路由器实现,或者为方便起见,使用字符串作为SpEL表达式或裁判-方法双。
此外,你还可以配置路线()带有λ,并使用λ表示Consumer<RouterSpec<MethodInvokingRouter>>.
流利的API还提供摘要地图消息路由器选项包括channelMapping(字符串键,字符串channelName)如下例子所示:
@Bean
public IntegrationFlow routeFlowByLambda() {
return IntegrationFlow.from("routerInput")
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.suffix("Channel")
.channelMapping(true, "even")
.channelMapping(false, "odd")
)
.get();
}
以下示例展示了一个简单的基于表达式的布线器:
@Bean
public IntegrationFlow routeFlowByExpression() {
return IntegrationFlow.from("routerInput")
.route("headers['destChannel']")
.get();
}
这routeToRecipients()方法取Consumer<RecipientListRouterSpec>,如下示例所示:
@Bean
public IntegrationFlow recipientListFlow() {
return IntegrationFlow.from("recipientListInput")
.<String, String>transform(p -> p.replaceFirst("Payload", ""))
.routeToRecipients(r -> r
.recipient("thing1-channel", "'thing1' == payload")
.recipientMessageSelector("thing2-channel", m ->
m.getHeaders().containsKey("recipient")
&& (boolean) m.getHeaders().get("recipient"))
.recipientFlow("'thing1' == payload or 'thing2' == payload or 'thing3' == payload",
f -> f.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("recipientListSubFlow1Result")))
.recipientFlow((String p) -> p.startsWith("thing3"),
f -> f.transform("Hello "::concat)
.channel(c -> c.queue("recipientListSubFlow2Result")))
.recipientFlow(new FunctionExpression<Message<?>>(m ->
"thing3".equals(m.getPayload())),
f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
.defaultOutputToParentFlow())
.get();
}
这.defaultOutputToParentFlow()关于.routeToRecipients()定义可以让你设置路由器的defaultOutput作为通道,用于继续处理主流中未匹配消息的进程。
分配器
要创建分线器,可以使用分裂()EIP方法。
默认情况下,如果有效载荷是可迭代一迭 代一数组一个流或者说是响应式发行人这分裂()方法将每个项目输出为单独的消息。
它接受 lambda、SpEL 表达式或任意摘要消息分流器实现。
或者,你也可以用它来不带参数,提供默认消息分裂器.
以下示例展示了如何使用分裂()通过提供一个λ来实现方法:
@Bean
public IntegrationFlow splitFlow() {
return IntegrationFlow.from("splitInput")
.split(s -> s.applySequence(false).delimiters(","))
.channel(MessageChannels.executor(taskExecutor()))
.get();
}
前面的例子创建了一个分线器,用于拆分包含逗号分隔的消息字符串.
聚合器与重编序列器
一聚合在概念上是 的对立面分配器.
它将一系列单独消息聚合成一条消息,且必然更为复杂。
默认情况下,聚合器返回包含来自新消息的有效载荷集合的消息。
同样的规则适用于重序器.
以下示例展示了分频-聚合器模式的一个典型示例:
@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlow.from("splitAggregateInput")
.split()
.channel(MessageChannels.executor(this.taskExecutor()))
.resequence()
.aggregate()
.get();
}
这分裂()方法将列表拆分为单独的消息,并发送给执行者频道.
这重序()方法根据消息头部中的序列细节重新排序消息。
这aggregate()方法收集这些消息。
不过,你可以通过指定发布策略和相关策略等来更改默认行为。 请考虑以下例子:
.aggregate(a ->
a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
.releaseStrategy(g -> g.size() > 10)
.messageStore(messageStore()))
前述示例关联了具有myCorrelationKey在累积至少十条消息后,释放头部和释放消息。
类似的λ配置也适用于重序()EIP方法。
服务激活器和.handle()方法
这.handle()EIP方法的目标是调用任意消息处理器实现或任何方法在某个POJO上。另一种选择是通过使用λ表达式定义一个“活动”。因此,我们引入了一个通用的通用处理<P>功能性接口。 其处理方法需要两个参数:P有效载荷和MessageHeaders 头部(从版本5.1开始)。有了这个,我们可以定义一个流程如下:
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.from("flow3Input")
.<Integer>handle((p, h) -> p * 2)
.get();
}
前例会使其接收到的任意整数加倍。
然而,春季集成的一个主要目标是松耦合通过将运行时类型从消息有效载荷转换为消息处理程序的目标参数。由于 Java 不支持对 lambda 类的通用类型解析,我们引入了一个额外的变通方法有效载荷类型关于大多数EIP方法的论证LambdaMessage处理器. 这样做将艰难的改造工作委托给了斯普林转换服务,该 使用 所提供的类型以及请求给目标方法参数的消息。以下示例展示了得到的结果集成流程可能看起来像:
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlow.from("input")
.<byte[], String>transform(p - > new String(p, "UTF-8"))
.handle(Integer.class, (p, h) -> p * 2)
.get();
}
我们也可以注册一些BytesToInteger转换器在转换服务去掉那个额外的.transform():
@Bean
@IntegrationConverter
public BytesToIntegerConverter bytesToIntegerConverter() {
return new BytesToIntegerConverter();
}
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlow.from("input")
.handle(Integer.class, (p, h) -> p * 2)
.get();
}
运营商网关()
这网关()在集成流程定义是一种特殊的服务激活器实现,通过其输入通道调用其他端点或集成流并等待回复。技术上它与嵌套功能相同<网关>在 a 中的组件<链>定义(参见链中链调用)并使流程更简洁、更直接。从逻辑上讲,从业务角度来看,它是一个消息网关,允许在目标集成解决方案的不同部分之间分配和重用功能(参见消息网关)。该操作员有多个针对不同目标的超载:
-
gateway(String requestChannel)向某个端点的输入通道发送消息,按其名称; -
网关(MessageChannel requestChannel)通过直接注入向某端点的输入通道发送消息; -
网关(IntegrationFlow 流)向所提供的输入信道发送消息集成流程.
这些都有与第二个变体的变体Consumer<GatewayEndpointSpec>参数以配置目标网关消息处理器以及摘要终点. 另外,还有集成流程基于的方法允许调用现有方法集成流程或通过原位lambda声明该流为子流,用于集成流程功能接口或在私人方法清洁器代码样式:
@Bean
IntegrationFlow someFlow() {
return IntegrationFlow
.from(...)
.gateway(subFlow())
.handle(...)
.get();
}
private static IntegrationFlow subFlow() {
return f -> f
.scatterGather(s -> s.recipientFlow(...),
g -> g.outputProcessor(MessageGroup::getOne))
}
如果下游流量不总是返回回复,你应该设置请求超时将 0 设置为 0,以防止调用线程无限期挂起。在这种情况会在该点结束,线程被释放以便继续工作。 |
操作员日志()
为了方便起见,记录消息通过 Spring 集成流程(<日志通道适配器>),alog()表示了算子。内部,它由窃听 通道拦截者其中日志处理器作为其订阅者。它负责将收到的消息记录到下一个端点或当前信道。以下示例展示了如何使用日志处理器:
.filter(...)
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(...)
在上述例子中,一个身份证头部记录在错误水平调到测试。类别仅适用于通过过滤且路由前的消息。
从6.0版本开始,该作符在流程末尾的行为与其中间的使用方式保持一致。换句话说,即使log()算符被移除。因此,如果流程末尾不期望产生回复,则nullChannel()建议在最后一次之后使用log().
算符截距()
从5.3版本开始,拦截()操作员允许注册一个或多个通道拦截者当前实例消息频道在流程中。这是创建显式消息频道通过消息频道应用程序接口。
以下示例使用了一个消息选择拦截器拒绝某些消息但有例外:
.transform(...)
.intercept(new MessageSelectingInterceptor(m -> m.getPayload().isValid()))
.handle(...)
MessageChannelSpec.wireTap()
春季集成包括.wireTap()流流 API消息通道规范建设者。
以下示例展示了如何使用窃听记录输入的方法:
@Bean
public QueueChannelSpec myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input");
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
|
如果
|
当电流消息频道不实现拦截频道,一个隐式直达频道和桥接处理者被注射到中集成流程,以及窃听新增直达频道.
以下示例没有任何信道声明:
.handle(...)
.log()
}
在前述例子中(以及任何未声明通道时),隐式直达频道被注入在当前位置集成流程并用作当前配置的输出通道服务激活处理器(摘自.handle(),前面描述的)。
处理消息流
IntegrationFlowBuilder提供顶级 API,用于生成与消息流相关的集成组件。
当集成可能通过单一流程完成(这通常如此),这非常方便。
交互集成流程实例可以通过以下方式连接消息频道实例。
默认情况下,消息流在 Spring Integration 的术语中表现为“链”。
也就是说,端点是自动且隐式地由直达频道实例。
消息流实际上并非构建成链条,这提供了更大的灵活性。
例如,如果你知道流程中的某个组件,你可以向它发送消息输入通道名称(即如果你明确定义的话)。
你也可以在流中引用外部定义的通道,以便使用通道适配器(以启用远程传输协议、文件输入输出等),而非直接通道。
因此,DSL 不支持 Spring 集成链元素,因为在这种情况下它并没有带来太多价值。
由于 Spring 集成,Java DSL 生成的豆定义模型与其他配置选项相同,且基于现有的 Spring 框架@Configuration基础设施,它可以与XML定义一起使用,并与Spring Integration消息注释配置结合。
你也可以定义直接集成流程通过使用 lambda 来实现实例。
以下示例展示了如何实现:
@Bean
public IntegrationFlow lambdaFlow() {
return f -> f.filter("World"::equals)
.transform("Hello "::concat)
.handle(System.out::println);
}
该定义的结果是,与隐含的直接通道连接的积分分量相同。
唯一的限制是该流起始于一个命名的直接通道——lambdaFlow.input.
此外,Lambda流不能从消息源或消息制作人.
从5.1版本开始,这种集成流程被包裹到代理中,以暴露生命周期控制并提供对输入通道内部关联的标准集成流程.
从版本5.0.6开始,生成的组件豆名集成流程先加上流豆和点(.)作为前缀。
例如,ConsumerEndpointFactoryBean对于.transform(“Hello ”::concat)在前一个样本中,结果为豆名lambdaFlow.o.s.i.config.ConsumerEndpointFactoryBean#0.
(O.S.I.是从org.springframework.integration才能融入纸上。)
这转换器该端点的实现豆具有一个豆名lambdaFlow.transformer#0(从版本5.1开始),其中不再是完全限定的名称MethodInvokingTransformer使用类,其组件类型。
所有NamedComponent当豆子名称必须在流程中生成时。
这些生成的豆名前加上流ID,用于解析日志或在某些分析工具中将组件分组,以及避免运行时同时注册集成流时出现竞态。
更多信息请参见动态集成和运行时集成流程。
函数表达式
我们引入了函数表达式类(SpEL 的实现表达接口)以便我们使用 lambda 和泛 型.
这Function<T, R>DSL 组件提供了选项,并配有表达当有隐含的策略源自Core Spring集成的变体。
以下示例展示了如何使用函数表达式:
.enrich(e -> e.requestChannel("enrichChannel")
.requestPayload(Message::getPayload)
.propertyFunction("date", m -> new Date()))
这函数表达式也支持运行时类型转换,如SpelExpression.
子流支持
一些如果。。。还和发布订阅组件通过使用子流来指定其逻辑或映射的能力。
最简单的样本为.publishSubscribeChannel(),如下示例所示:
@Bean
public IntegrationFlow subscribersFlow() {
return flow -> flow
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.<Integer>handle((p, h) -> p / 2)
.channel(c -> c.queue("subscriber1Results")))
.subscribe(f -> f
.<Integer>handle((p, h) -> p * 2)
.channel(c -> c.queue("subscriber2Results"))))
.<Integer>handle((p, h) -> p * 3)
.channel(c -> c.queue("subscriber3Results"));
}
你也可以用分开达到同样的结果集成流程 @Bean定义,但我们希望你觉得子流式逻辑组合对你有用。
我们发现,这会让代码更短(因此更易读)。
从5.3版本开始,a广播能力频道-基于publishSubscribeChannel()实现方式用于配置经纪人支持的消息通道上的子流用户。
例如,我们现在可以将多个订阅者配置为子流Jms.publishSubscribeChannel():
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(jmsConnectionFactory())
.destination("pubsub")
.get();
}
@Bean
public IntegrationFlow pubSubFlow() {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel(),
pubsub -> pubsub
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel1")))
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel(ConnectionFactory jmsConnectionFactory) {
return (BroadcastCapableChannel) Jms.publishSubscribeChannel(jmsConnectionFactory)
.destination("pubsub")
.get();
}
类似的发布订阅子流组成提供.routeToRecipients()方法。
另一个例子是.discardFlow()而不是.discardChannel()在.filter()方法。
这.route()值得特别关注。
请考虑以下例子:
@Bean
public IntegrationFlow routeFlow() {
return f -> f
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.channelMapping("true", "evenChannel")
.subFlowMapping("false", sf ->
sf.<Integer>handle((p, h) -> p * 3)))
.transform(Object::toString)
.channel(c -> c.queue("oddChannel"));
}
这.channelMapping()正常运行路由器映射,但.subFlowMapping()把那个子流连接到主流。
换句话说,任一路由器的子流在之后返回主流.route().
|
有时,你需要参考现有的
Caused by: org.springframework.beans.factory.BeanCreationException:
The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c)
is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'.
This is the end of the integration flow.
当你将子流配置为 lambda 时,框架会处理与子流的请求-回复交互,无需网关。 |
子流量可以嵌套到任意深度,但我们不建议这样做。 事实上,即使在路由器的情况下,在流程中加入复杂的子流也会很快看起来像一盘意大利面,人类很难理解。
|
在DSL支持子流配置的情况下,通常需要一个通道用于配置的组件,而该子流以
框架内部创建了 |
使用协议适配器
目前展示的所有示例都展示了 DSL 如何通过使用 Spring Integration 编程模型支持消息传递架构。 然而,我们还没有真正实现任何整合。 这样做需要通过HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP等访问远程资源,或访问本地文件系统。 Spring Integration 支持所有这些以及更多功能。 理想情况下,DSL应为所有这些软件提供一流的支持,但实现这些并跟上Spring Integration新适配器的加入是一项艰巨的任务。 因此,预计DSL会不断赶上春季集成。
因此,我们提供了高级 API,以无缝定义协议特定的消息传递。
我们用工厂和建造者模式以及lambda来实现这一点。
你可以把工厂类看作“命名空间工厂”,因为它们在具体协议专用的 Spring Integration 模块中,扮演着与 XML 命名空间相同的角色。
目前,Spring Integration Java DSL 支持AMQP,饲料,JMS,文件,(S)FTP,http,JPA,MongoDb,TCP/UDP,邮件,网络流和脚本命名空间工厂。
以下示例展示了如何使用其中三个(AMQP,JMS和邮件):
@Bean
public IntegrationFlow amqpFlow() {
return IntegrationFlow.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
.transform("hello "::concat)
.transform(String.class, String::toUpperCase)
.get();
}
@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
return IntegrationFlow.from("jmsOutboundGatewayChannel")
.handle(Jms.outboundGateway(this.jmsConnectionFactory)
.replyContainer(c ->
c.concurrentConsumers(3)
.sessionTransacted(true))
.requestDestination("jmsPipelineTest"))
.get();
}
@Bean
public IntegrationFlow sendMailFlow() {
return IntegrationFlow.from("sendMailChannel")
.handle(Mail.outboundAdapter("localhost")
.port(smtpPort)
.credentials("user", "pw")
.protocol("smtp")
.javaMailProperties(p -> p.put("mail.debug", "true")),
e -> e.id("sendMailEndpoint"))
.get();
}
前面的例子展示了如何使用“命名空间工厂”作为内联适配器声明。
不过,你也可以从@Bean定义以实现集成流程方法链更易读。
| 我们正在征求社区对这些命名空间工厂的反馈,再考虑其他工厂。 我们也非常感谢任何关于优先支持哪些适配器和网关的建议。 |
你可以在本参考手册的各协议章节中找到更多Java DSL示例。
所有其他协议通道适配器都可以配置为通用豆,并接线到集成流程,如下示例所示:
@Bean
public QueueChannelSpec wrongMessagesChannel() {
return MessageChannels
.queue()
.wireTap("wrongMessagesWireTapChannel");
}
@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
return IntegrationFlow.from("inputChannel")
.filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
e -> e.discardChannel(wrongMessagesChannel))
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(xpathRouter(wrongMessagesChannel))
.get();
}
@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
XPathRouter router = new XPathRouter("local-name(/*)");
router.setEvaluateAsString(true);
router.setResolutionRequired(false);
router.setDefaultOutputChannel(wrongMessagesChannel);
router.setChannelMapping("Tags", "splittingChannel");
router.setChannelMapping("Tag", "receivedChannel");
return router;
}
集成流适配器
这集成流程接口可以直接实现并指定为扫描组件,如下示例所示:
@Component
public class MyFlow implements IntegrationFlow {
@Override
public void configure(IntegrationFlowDefinition<?> f) {
f.<String, String>transform(String::toUpperCase);
}
}
它被集成FlowBeanPostProcessor并且在应用上下文中正确解析和注册。
为了方便并享受松耦合架构的优势,我们提供了集成流适配器基础类实现。
它需要buildFlow()实现方法以产生集成流程定义通过使用以下一个from()方法,如下示例所示:
@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {
private final AtomicBoolean invoked = new AtomicBoolean();
public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(this::messageSource,
e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
.split(this)
.transform(this)
.aggregate(a -> a.processor(this, null), null)
.enrichHeaders(Collections.singletonMap("thing1", "THING1"))
.filter(this)
.handle(this)
.channel(c -> c.queue("myFlowAdapterOutput"));
}
public String messageSource() {
return "T,H,I,N,G,2";
}
@Splitter
public String[] split(String payload) {
return StringUtils.commaDelimitedListToStringArray(payload);
}
@Transformer
public String transform(String payload) {
return payload.toLowerCase();
}
@Aggregator
public String aggregate(List<String> payloads) {
return payloads.stream().collect(Collectors.joining());
}
@Filter
public boolean filter(@Header Optional<String> thing1) {
return thing1.isPresent();
}
@ServiceActivator
public String handle(String payload, @Header String thing1) {
return payload + ":" + thing1;
}
}
动态集成流与运行时集成流程
集成流程其所有依赖分量都可以在运行时注册。
在5.0版本之前,我们使用的是BeanFactory.registerSingleton()钩。
从春季框架开始5.0,我们使用实例提供商程序化钩子豆子定义注册。
以下示例展示了如何编程注册豆子:
BeanDefinition beanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
.getRawBeanDefinition();
((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);
注意,在前面的例子中,实例提供商钩子是genericBean定义此时由 lambda 提供。
所有必要的豆初始化和生命周期都是自动完成的,就像标准上下文配置豆定义一样。
为了简化开发体验,推出了 Spring 集成集成流上下文注册和管理集成流程运行时实例,如下示例所示:
@Autowired
private AbstractServerConnectionFactory server1;
@Autowired
private IntegrationFlowContext flowContext;
...
@Test
public void testTcpGateways() {
TestingUtilities.waitListening(this.server1, null);
IntegrationFlow flow = f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client1"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}
当我们有多个配置选项,需要创建多个类似流程实例时,这非常有用。
为此,我们可以不断迭代选项,创建并注册集成流程循环中的实例。
另一种变体是数据来源不是基于Spring的,因此我们必须即时创建数据。
这样的示例就是响应式流事件源,如下示例所示:
Flux<Message<?>> messageFlux =
Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);
QueueChannel resultChannel = new QueueChannel();
IntegrationFlow integrationFlow =
IntegrationFlow.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();
this.integrationFlowContext.registration(integrationFlow)
.register();
这集成流程注册构建器(由于IntegrationFlowContext.registration())可用于指定集成流程登记,控制其自动启动注册时,非Spring集成豆。
通常,这些额外的豆子是连接工厂(AMQP、JMS、(S)FTP、TCP/UDP 等)、串行器和解串器,或其他必要的支持组件。
你可以使用IntegrationFlowRegistration.destroy()回调以移除动态注册集成流程以及所有依赖豆子的依赖,当你不再需要它们时。
参见集成流上下文Javadoc更多信息请见。
从5.0.6版本开始,所有生成的豆子名称都在集成流程定义前缀是流 ID 作为前缀。
我们建议始终指定一个显式的流程ID。
否则,同步势垒将在集成流上下文,生成豆名集成流程并登记了它的豆子。
我们在这两个作上同步,以避免当生成的豆子名称可能被用于不同时出现的竞争条件集成流程实例。 |
此外,从5.0.6版本开始,注册构建API新增了一种方法:useFlowIdAsPrefix().
如果你想声明同一流程的多个实例,并且当流程中组件具有相同 ID 时避免豆名冲突,这非常有用,如下示例所示:
private void registerFlows() {
IntegrationFlowRegistration flow1 =
this.flowContext.registration(buildFlow(1234))
.id("tcp1")
.useFlowIdAsPrefix()
.register();
IntegrationFlowRegistration flow2 =
this.flowContext.registration(buildFlow(1235))
.id("tcp2")
.useFlowIdAsPrefix()
.register();
}
private IntegrationFlow buildFlow(int port) {
return f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
}
在这种情况下,第一个流程的消息处理程序可以用 bean a 的名称引用TCP1.client.handler.
一身份证属性是你使用E时必需的useFlowIdAsPrefix(). |
集成流程作为门户
这集成流程可以从提供GatewayProxyFactoryBean组件,如下示例所示:
public interface ControlBusGateway {
void send(String command);
}
...
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlow.from(ControlBusGateway.class)
.controlBus()
.get();
}
所有接口方法的代理都随通道一起提供,用于向下一个集成组件发送消息集成流程.
你可以用@MessagingGateway注释并用@Gateway附注。
然而,请求频道被忽略并被该内部信道覆盖,用于下一个组件集成流程.
否则,通过 创建这样的配置集成流程说不通。
默认情况下,GatewayProxyFactoryBean获得一个传统的豆名,例如[FLOW_BEAN_NAME.gateway].
你可以通过使用@MessagingGateway.name()属性或过载IntegrationFlow.from(Class<?> serviceInterface, Consumer<GatewayProxySpec> endpointConfigurer)工厂方法。
此外,所有属性来自@MessagingGateway接口上的注释被应用于目标GatewayProxyFactoryBean.
当注释配置不适用时,Consumer<GatewayProxySpec>变体可用于为目标代理提供合适的选项。
该DSL方法从5.2版本开始使用。
在 Java 8 中,你甚至可以创建一个集成网关java.util.function如下示例所示:
@Bean
public IntegrationFlow errorRecovererFlow() {
return IntegrationFlow.from(Function.class, (gateway) -> gateway.beanName("errorRecovererFunction"))
.<Object>handle((p, h) -> {
throw new RuntimeException("intentional");
}, e -> e.advice(retryAdvice()))
.get();
}
那errorRecoveryerFlow可以如下方式使用:
@Autowired
@Qualifier("errorRecovererFunction")
private Function<String, String> errorRecovererFlowGateway;
DSL 扩展
从5.3版本开始,一个集成流扩展被引入以允许通过自定义或组合EIP作符扩展现有Java DSL。
只需对该类进行扩展,提供可用于集成流程Beans定义。
扩展类也可以用于自定义IntegrationComponentSpec配置;例如,可以在现有的中实现遗漏或默认选项IntegrationComponentSpec外延。
下面的示例展示了复合自定义算子及其在聚合器规格默认自定义的扩展输出处理器:
public class CustomIntegrationFlowDefinition
extends IntegrationFlowExtension<CustomIntegrationFlowDefinition> {
public CustomIntegrationFlowDefinition upperCaseAfterSplit() {
return split()
.transform("payload.toUpperCase()");
}
public CustomIntegrationFlowDefinition customAggregate(Consumer<CustomAggregatorSpec> aggregator) {
return register(new CustomAggregatorSpec(), aggregator);
}
}
public class CustomAggregatorSpec extends AggregatorSpec {
CustomAggregatorSpec() {
outputProcessor(group ->
group.getMessages()
.stream()
.map(Message::getPayload)
.map(String.class::cast)
.collect(Collectors.joining(", ")));
}
}
对于方法链流,这些扩展中的新DSL算子必须返回扩展类。
这样一个目标集成流程定义适用于新旧DSL操作员:
@Bean
public IntegrationFlow customFlowDefinition() {
return
new CustomIntegrationFlowDefinition()
.log()
.upperCaseAfterSplit()
.channel("innerChannel")
.customAggregate(customAggregatorSpec ->
customAggregatorSpec.expireGroupsUponCompletion(true))
.logAndReply();
}
积分流组成
与消息频道作为春季整合中的一等公民,整合流的组成始终被假设。
流量中任一端点的输入通道都可以用于从任何其他端点发送消息,而不仅仅是从拥有该通道输出端点的端点。
此外,当@MessagingGateway合同、内容丰富器组件、复合端点如<链>,现在集成流程Beans(例如:集成流适配器),它足够简单,可以将业务逻辑分散到更短且可重复使用的部分之间。
最终作文所需的仅仅是关于 的知识消息频道发送或接收。
从版本开始5.5.4,进一步抽象消息频道并对最终用户隐藏实现细节,集成流程引入from(IntegrationFlow)工厂方法以允许电流启动集成流程从现有流量的输出中:
@Bean
IntegrationFlow templateSourceFlow() {
return IntegrationFlow.fromSupplier(() -> "test data")
.channel("sourceChannel")
.get();
}
@Bean
IntegrationFlow compositionMainFlow(IntegrationFlow templateSourceFlow) {
return IntegrationFlow.from(templateSourceFlow)
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("compositionMainFlowResult"))
.get();
}
另一方面,集成流程定义已添加to(IntegrationFlow)端子作符继续在其他流的输入通道上流动电流:
@Bean
IntegrationFlow mainFlow(IntegrationFlow otherFlow) {
return f -> f
.<String, String>transform(String::toUpperCase)
.to(otherFlow);
}
@Bean
IntegrationFlow otherFlow() {
return f -> f
.<String, String>transform(p -> p + " from other flow")
.channel(c -> c.queue("otherFlowResultChannel"));
}
流动中间的组成可以用现有的gateway(IntegrationFlow)EIP方法。
这样,我们可以通过从更简单、可重用的逻辑块组合出任意复杂度的流程。
例如,你可以添加一个库集成流程Beans 作为依赖,只需将他们的配置类导入最终项目并自动布线即可集成流程定义。