|
对于最新稳定版本,请使用 Spring Integration 7.0.0! |
消息网关
网关隐藏了 Spring Integration 提供的消息传递 API。 它让你的应用的业务逻辑对 Spring 集成 API 一无所知。 通过使用通用网关,你的代码只需与简单的接互。
这时GatewayProxyFactoryBean
如前所述,最好不要依赖 Spring Integration API——包括网关类。
因此,Spring 集成提供了GatewayProxyFactoryBean生成任意接口的代理,并在内部调用下面所示的网关方法。
通过依赖注入,你可以将接口暴露给你的业务方法。
以下示例展示了一个可用于与 Spring 集成交互的接口:
public interface Cafe {
void placeOrder(Order order);
}
Gateway XML 命名空间支持
还提供命名空间支持。 它允许您将接口配置为服务,如下示例所示:
<int:gateway id="cafeService"
service-interface="org.cafeteria.Cafe"
default-request-channel="requestChannel"
default-reply-timeout="10000"
default-reply-channel="replyChannel"/>
定义此配置后,咖啡服务现在可以注入到其他 Beans 中,调用该代理实例上方法的代码咖啡屋接口不了解 Spring 集成 API。
请参见“样本”附录,其中有一个使用了网关element(在Cafe演示中)。
前述配置中的默认设置应用到网关接口上的所有方法。 如果未指定回复超时,呼叫线程将等待回复30秒。 当没有响应到达时,请查看网关行为。
默认值可以针对各个方法被覆盖。 参见带注释和XML的网关配置。
设置默认回复频道
通常,你不需要具体说明默认回复信道,因为网关会自动创建一个临时的匿名回复通道,在那里监听回复。
然而,在某些情况下,你可能会要求定义一个默认回复信道(或回复信道配合适配器网关,如HTTP、JMS等)。
为了提供一些背景,我们简要讨论了传送门的一些内部运作。
网关创建一个临时的点对点响应信道。
它是匿名的,并以以下名称添加到消息头中,回复频道.
提供显式默认回复信道 (回复信道通过远程适配器网关,你可以指向一个发布-订阅频道,这个名称是因为你可以向它添加多个订阅者。
在内部,Spring Integration 搭建了临时回复频道以及明确定义的默认回复信道.
假设你希望回复不仅发送到网关,还要发送给其他消费者。 在这种情况下,你需要两样东西:
-
一个你可以订阅的命名频道
-
那个频道应该是发布-订阅-频道
网关默认的策略无法满足这些需求,因为添加到报头的回复信道是匿名且点对点的。
这意味着没有其他订阅者能获得该消息的句号,即使能,频道也具有点对点行为,只有一个订阅者能收到消息。
通过定义默认回复信道你可以指向你选择的频道。
在这种情况下,那是一个发布-订阅-频道.
网关会从它建立一个桥接,连接到存储在头部中的临时匿名回复信道。
你也可以明确提供一个通过拦截器进行监控或审计的回复通道(例如窃听)。 要配置通道拦截器,你需要一个命名通道。
从版本5.4开始,当网关方法返回类型为无效,该框架填充回复频道头作为零通道如果没有明确提供此类头部,则称为 BEAN 引用。
这使得下游流中任何可能的响应都可以被丢弃,满足单向网关契约。 |
带注释和XML的网关配置
请考虑以下例子,它是对前述的扩展咖啡屋通过添加一个接口示例@Gateway注解:
public interface Cafe {
@Gateway(requestChannel="orders")
void placeOrder(Order order);
}
这@Header注释允许你添加被解释为消息头的值,如下示例所示:
public interface FileWriter {
@Gateway(requestChannel="filesOut")
void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);
}
如果你更喜欢用XML方法配置网关方法,可以添加方法如下示例所示,这些元素对网关配置进行了调整:
<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB"/>
<int:method name="echoViaDefault"/>
</int:gateway>
你也可以用XML为每个方法调用提供单独的头部。
如果你想设置的头部是静态的,并且不想通过以下方式嵌入到网关的方法签名中,这可能很有用@Header附注。
例如,在贷款经纪人的例子中,我们希望根据发起的请求类型(单一报价或全部报价)来影响贷款报价的汇总方式。
通过评估调用的网关方法来确定请求类型,虽然可能,但会违反关注点分离范式(该方法属于Java工件)。
然而,在消息结构中,用消息头表达你的意图(元信息)是很自然的。
以下示例展示了如何为两种方法中的每种添加不同的消息头:
<int:gateway id="loanBrokerGateway"
service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
<int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="BEST"/>
</int:method>
<int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="ALL"/>
</int:method>
</int:gateway>
在前述示例中,根据网关的方法,'RESPONSE_TYPE'头部设置了不同的值。
例如,如果你指定请求频道在<int:method/>以及在@Gateway注释,注释值获胜。 |
如果在XML中指定了一个无参数网关,且接口方法同时具有@Payload和@Gateway注释(带有payloadExpression或者有效载荷表达式在<int:method/>元素),该@Payload价值被忽视了。 |
表达式与“全局”头部
这<头/>元素支持表达作为替代方案值.
通过计算SpEL表达式来确定头部的值。
从5.2版本开始,#root评估上下文的对象是MethodArgsHolder跟getMethod()和getArgs()访问。
例如,如果你想在简单的方法名上路由,可以添加一个包含以下表达式的头部:method.name.
这java.reflect.方法不可序列化。
一个表达式为方法如果后来将消息序列化,则会丢失。
因此,你可能希望使用method.name或method.toString()在那些情况下。
这toString()方法提供字符串方法的表示,包括参数类型和返回类型。 |
自3.0版本起,<默认头/>可以定义元素,为网关生成的所有消息添加头部,无论调用何种方法。
为方法定义的特定头优先于默认头。
这里为某个方法定义的特定头部覆盖了任何@Header服务接口中的注释。
然而,默认头部不会覆盖任何@Header服务接口中的注释。
网关现在还支持默认有效载荷表达式,该方法适用于所有方法(除非被覆盖)。
将方法参数映射到消息
使用前一节的配置技术可以控制方法参数如何映射到消息元素(有效载荷和头部)。 当不使用显式配置时,会使用特定的约定来执行映射。 在某些情况下,这些约定无法确定哪个参数是有效载荷,哪个应映射到头部。 请考虑以下例子:
public String send1(Object thing1, Map thing2);
public String send2(Map thing1, Map thing2);
在第一种情况下,惯例是将第一个参数映射到有效载荷(只要它不是地图)而第二个参数的内容则成为头部。
在第二种情况下(或第一种,当参数 的论证为东西1是地图框架无法确定哪个参数应为有效载荷。
因此,映射失败了。
这通常可以通过有效载荷表达式一个@Payload注释,或@Headers注解。
或者(当惯例失效时),你可以完全负责将方法调用映射到消息。
为此,实现一个MethodArgsMessageMapper并向<网关/>通过使用映射属性。
映射器映射 aMethodArgsHolder,这是一个简单类,它包裹了java.reflect.方法实例和一个对象[]包含了论点。
当提供自定义映射器时,默认有效载荷表达式属性和<默认头/>网关不允许有元素。
同样,有效载荷表达式属性和<头/>在任何情况下都不允许有元素<方法/>元素。
映射方法参数
以下示例展示了方法参数如何映射到消息,并展示了一些无效配置的示例:
public interface MyGateway {
void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);
void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);
void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);
void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added
void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);
@Payload("args[0] + args[1] + '!'")
void payloadAnnotationAtMethodLevel(String a, String b);
@Payload("@someBean.exclaim(args[0])")
void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);
void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);
void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); // (1)
// invalid
void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);
// invalid
void twoPayloads(@Payload String s1, @Payload String s2);
// invalid
void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);
// invalid
void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);
}
| 1 | 注意,在这个例子中,SpEL 变量,#this,指的是参数——在此例中,是s. |
XML的对应物看起来有些不同,因为没有#this方法论证的上下文。
然而,表达式可以通过使用args该性质MethodArgsHolder根对象(详见表达式和“全局”头部),如下示例所示:
<int:gateway id="myGateway" service-interface="org.something.MyGateway">
<int:method name="send1" payload-expression="args[0] + 'thing2'"/>
<int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
<int:method name="send3" payload-expression="method"/>
<int:method name="send4">
<int:header name="thing1" expression="args[2].toUpperCase()"/>
</int:method>
</int:gateway>
@MessagingGateway注解
从4.0版本开始,网关服务接口可以用@MessagingGateway注释代替定义<网关 />用于配置的XML元素。
以下两组示例比较了两种配置同一网关的方法:
<int:gateway id="myGateway" service-interface="org.something.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB">
<int:header name="thing1" value="thing2"/>
</int:method>
<int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
defaultHeaders = @GatewayHeader(name = "calledMethod",
expression="#gatewayMethod.name"))
public interface TestGateway {
@Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
String echo(String payload);
@Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
String echoUpperCase(String payload);
String echoViaDefault(String payload);
}
与XML版本类似,当Spring Integration在组件扫描中发现这些注释时,会创建代理通过其消息基础设施实现。
要执行此扫描并注册豆子定义在应用上下文中,添加@IntegrationComponentScan注释到@Configuration类。
标准@ComponentScan基础设施不涉及接口。
因此,我们引入了这个习俗@IntegrationComponentScan逻辑 以求得@MessagingGateway接口和寄存器的注释GatewayProxyFactoryBean为他们准备实例。
另见注释支持。 |
同时@MessagingGateway注释 你可以用@Profile如果没有激活的 Bean 配置文件,注释以避免创建 Bean。
从6.0版本开始,一个与@MessagingGateway也可以用 a 标记@Primary对相应配置逻辑的注释,适用于任何Spring@Component定义。
从6.0版本开始,@MessagingGateway标准 Spring 可以使用接口@Import配置。
这可以作为@IntegrationComponentScan或手动AnnotationGatewayProxyFactoryBeanBeans定义。
这@MessagingGateway是用 a-meta 注释的@MessageEndpoint自版本起6.0以及名称()属性本质上是别名于@Compnent.value().
这样,网关代理的豆名生成策略会重新对齐扫描和导入组件的标准 Spring 注释配置。
默认注释豆名生成器可以通过AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR或者作为@IntegrationComponentScan.nameGenerator()属性。
如果你没有XML配置,@EnableIntegration至少需要对其中一个进行注释@Configuration类。
看配置和@EnableIntegration更多信息请见。 |
调用无参数方法
当在没有参数的网关接口上调用方法时,默认行为是接收消息来自Pollable频道.
不过,有时你可能想触发无参数方法,以便与下游不需要用户提供参数的其他组件交互,比如触发无参数SQL调用或存储过程。
要实现发送和接收语义,你必须提供有效载荷。
生成有效载荷时,接口上的方法参数并非必需。
你可以选择@Payload注释或有效载荷表达式在方法元素。
以下列表包含了一些有效载荷可能的例子:
-
一个字面上的字符串
-
#gatewayMethod.name
-
new java.util.Date()
-
@someBean.someMethod() 的返回值
以下示例展示了如何使用@Payload注解:
public interface Cafe {
@Payload("new java.util.Date()")
List<Order> retrieveOpenOrders();
}
你也可以使用@Gateway注解。
public interface Cafe {
@Gateway(payloadExpression = "new java.util.Date()")
List<Order> retrieveOpenOrders();
}
如果同时存在两个注释(且payloadExpression提供了),@Gateway赢了。 |
另请参见带注释和XML的网关配置。
如果某个方法没有参数和返回值,但包含有效载荷表达式,则被视为仅发送作。
调用默认值方法
网关代理的接口可能具有默认值方法也一样,从5.3版本开始,框架注入了DefaultMethodInvokingMethodInterceptor转为呼叫代理默认值使用 a 的方法java.lang.invoke.MethodHandle采取方式而不是代理。
JDK 的接口,例如java.util.function.函数仍然可以用作网关代理,但他们的默认值由于 Java 内部的安全原因,无法调用MethodHandles.Lookup对JDK类的实例化。
这些方法也可以通过显式@Gateway方法上的注释,或proxyDefaultMethods在@MessagingGateway注释或<网关>XML组件。
错误处理
网关调用可能导致错误。 默认情况下,下游发生的任何错误都会在网关方法调用时“按原样”重新抛出。 例如,考虑以下简单流:
gateway -> service-activator
如果服务激活器调用的服务抛出我的例外(例如),框架将其包裹为消息异常并将传递给服务激活器的消息附加在失败消息财产。
因此,框架执行的任何日志记录都包含了失败的完整上下文。
默认情况下,当异常被网关捕获时,我的例外被拆开并扔给来电者。
你可以配置一个抛出在网关方法声明中设置一个子句,以匹配原因链中的特定异常类型。
例如,如果你想接住一个完整的消息异常在所有关于下游错误原因的消息信息后,你应该拥有类似以下的网关方法:
public interface MyGateway {
void performProcess() throws MessagingException;
}
由于我们鼓励POJO编程,您可能不想让来电者接触消息基础设施。
如果你的网关方法没有抛出子句中,网关会穿越原因树,寻找运行异常这不是消息异常.
如果找不到,框架会抛出消息异常.
如果我的例外在前述讨论中,原因为其他例外以及你的方法抛出某个例外网关进一步展开并抛给呼叫者。
当网关声明时,没有服务接口,一个内部框架接口请求回复交换器被使用。
请考虑以下例子:
public interface RequestReplyExchanger {
Message<?> exchange(Message<?> request) throws MessagingException;
}
在5.0版本之前,这个交换方法没有抛出因此,例外被展开。
如果你用这个界面并想恢复之前的展开行为,请使用自定义服务接口而不是访问原因关于消息异常你自己。
不过,你可能想记录错误而不是传播,或者把异常视为有效的回复(通过映射到符合呼叫者理解的某个“错误消息”合同的消息)。
为此,网关通过支持错误的消息通道,包含对错误信道属性。
在下面的例子中,一个“变换器”生成一个回复消息来自例外:
<int:gateway id="sampleGateway"
default-request-channel="gatewayChannel"
service-interface="foo.bar.SimpleGateway"
error-channel="exceptionTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
这exceptionTransformer也可以是一个简单的POJO,知道如何创建预期的错误响应对象。
这成为发送给呼叫者的有效载荷。
如果有必要,你可以在这种“错误流”中做更多复杂的作。
可能涉及路由器(包括 Spring Integration 的错误消息异常类型路由器)、过滤,等等。
不过大多数情况下,一个简单的“转换器”就足够了。
或者,你可能只记录异常(或者异步发送)。
如果你提供单向流,任何信息都不会被回传给呼叫者。
如果你想完全抑制异常,可以提供全局的引用零通道(本质上是/dev/null方法)。
最后,如上所述,如果不是错误信道定义后,例外会像往常一样传播。
当你使用@MessagingGateway注释(参见@MessagingGateway'注释),你可以使用 'errorChannel属性。
从5.0版本开始,当你使用带有无效返回类型(单向流),即返回类型错误信道标准中包含参考(如提供)errorChannel每个发送消息的头部。
该特性允许基于标准的下游异步流执行者频道配置(或 a队列通道),以覆盖默认全局errorChannel异常发送行为。
以前你必须手动指定一个errorChannel带有@GatewayHeader注释或<标题>元素。
这错误信道财产被忽视无效具有异步流的方法。
相反,错误消息被发送到默认状态errorChannel.
通过简单的POJI网关暴露消息系统有好处,但“隐藏”底层消息系统的真实面是有代价的,因此你应考虑一些事项。
我们希望 Java 方法能尽快返回,并且在调用者等待返回(无论是 void、返回值还是抛出异常)期间,不会无限期卡顿。
当常规方法作为消息系统前的代理时,我们必须考虑底层消息可能存在异步性质。
这意味着网关发起的消息可能被过滤器丢弃,无法到达负责回复的组件。
某些服务激活方法可能导致异常,因此不会回复(因为我们不生成空消息)。
换句话说,多种情况可能导致回复消息永远不会收到。
这在消息系统中是完全正常的。
不过,请考虑对网关方法的影响。
网关的方法输入参数被整合进消息并发送到下游。
回复消息会被转换为网关方法的返回值。
所以你可能想确保每次网关调用都有一个回复消息。
否则,你的网关方法可能永远不会恢复,并且无限期卡住回复-超时设置为负值。
解决这种情况的一种方法是使用异步网关(本节后面会详细说明)。
另一种处理方式是依赖默认设置回复-超时作为30秒。
这样,网关就不会超过指定时间回复-超时如果超时过了,则返回“null”。
最后,你可以考虑在服务激活器上设置下游标志,比如“要求回复”,或在过滤器上设置“拒绝时抛出异常”。
这些选项在本章最后一部分有更详细的讨论。 |
如果下游流返回错误消息其有效载荷(a可投掷)被视为正规下游误差。
如果存在错误信道配置完成后,它会被送入错误流。
否则,有效载荷会抛给网关调用者。
同样,如果误差流在错误信道返回一个错误消息,其有效载荷被抛给呼叫者。
同样适用于任何带有可投掷有效载荷。
这在异步情况下非常有用,当你需要传播例外直接打电话。
为此,你可以选择返回例外(作为答从某个服务中)或者扔掉它。
通常,即使是异步流,框架也会负责将下游流抛出的异常传播回网关。
TCP客户端-服务器复用示例展示了两种技术,用于将异常返回给调用者。
它通过使用聚合跟小组暂停(参见聚合器和组超时)以及一个MessagingTimeoutException在弃牌流程中回复。 |
网关超时
网关有两个超时属性:请求超时和回复Timeout.
请求超时仅在通道可以阻塞(例如有界)时才适用队列通道那是满的)。
这回复Timeout值是网关等待回复或返回的时间零.
它默认是无限。
超时可以设置为网关上所有方法的默认值(defaultRequestTimeout和默认回复超时)或在消息网关界面注释。
各个方法可以覆盖这些默认值(在<方法/>子元素)或在@Gateway注解。
从5.0版本开始,超时可以定义为表达式,如下示例所示:
@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);
评估上下文有豆解决器(使用@someBean以指代其他Beans),以及args数组属性来自#root物品可查。
有关该根对象的更多信息,请参见表达式和“全局”头部。
在使用 XML 配置时,超时属性可以是长值或 SpEL 表达式,如下示例所示:
<method name="someMethod" request-channel="someRequestChannel"
payload-expression="args[0]"
request-timeout="1000"
reply-timeout="args[1]">
</method>
异步网关
作为一个模式,消息网关提供了一种很好的方式,可以隐藏消息专用代码,同时还能展示消息系统的全部功能。
如前所述,该GatewayProxyFactoryBean提供了一种方便的方式,通过服务接口暴露代理,基于POJO访问基于你自己域中的对象、原语/字符串或其他对象的消息系统。
然而,当网关通过简单的 POJO 方法返回值时,这意味着对于每个请求消息(在方法被调用时生成)必须有一个回复消息(当方法返回时生成)。
由于消息系统本质上是异步的,你可能无法保证“每个请求都会有回复”的合同。Spring Integration 2.0引入了异步网关支持,这为你在不知道是否需要回复或回复需要多长时间时,方便地发起流程。
为了处理这类情况,Spring Integration 使用java.util.concurrent.Future实例支持异步网关。
从XML配置来看,没有变化,你仍然以定义普通网关的方式定义异步网关,如下示例所示:
<int:gateway id="mathService"
service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
default-request-channel="requestChannel"/>
然而,网关接口(服务接口)略有不同,具体如下:
public interface MathServiceGateway {
Future<Integer> multiplyByTwo(int i);
}
如前例所示,网关方法的返回类型是前途.
什么时候GatewayProxyFactoryBean看到网关方法的返回类型是前途它会立即通过使用异步任务执行器.
这就是所有差异的全部。
调用此类方法时总是立即返回前途实例。
然后你可以与前途按自己的节奏获得结果,取消,等等。
此外,和其他任何前途实例,调用get()可能会显示超时、执行异常等。
以下示例展示了如何使用前途该通道来自异步网关:
MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult = result.get(1000, TimeUnit.SECONDS);
更详细的示例,请参见 Spring Integration 示例中的异步网关示例。
异步任务执行器
默认情况下,GatewayProxyFactoryBean使用org.springframework.core.task.SimpleAsyncTaskExecutor提交内部文件时异步调用任务任何返回类型为 的网关方法实例前途.
然而,异步执行器属性<网关/>element 的配置允许你引用任意实现java.util.concurrent.Executor在 Spring 应用上下文中提供。
(默认)SimpleAsyncTaskExecutor两者都支持。前途和完成未来返回类型。
看完成未来.
即使存在默认执行程序,提供外部执行程序通常很有用,这样你可以在日志中识别其线程(使用XML时,线程名称基于执行器的豆名),如下示例所示:
@Bean
public AsyncTaskExecutor exec() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
return simpleAsyncTaskExecutor;
}
@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果你想退货,是另一辆前途实现时,你可以提供自定义执行器,或者完全禁用该执行器并返回前途在下游流的回复消息中,有效载荷。
要禁用执行器,请将其设置为零在GatewayProxyFactoryBean(通过使用setAsyncTaskExecutor(null)).
在配置 XML 网关时,使用async-executor=”.
在使用以下设备进行配置时@MessagingGateway注释,使用类似以下代码:
@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果回流类型是特定的混凝土前途实现或其他未被配置执行器支持的子接口,流在调用者的线程上运行,且流必须返回回复消息负载中的所需类型。 |
完成未来
从4.2版本开始,网关方法现在可以回归CompletableFuture<?>.
返回此类产品有两种作模式:
-
当提供异步执行程序且返回类型恰好为
完成未来(非子类),框架在执行程序上执行任务,并立即返回完成未来对来电者。CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)用来创造未来。 -
当异步执行器被明确设置为
零返回类型为完成未来或者返回类型是 的子类完成未来,流程在呼叫者的线程中被调用。 在这种情况下,下游流预计将返回完成未来是合适的类型。
这org.springframework.util.concurrent.ListenableFuture自 Spring Framework 起已被弃用6.0.
现在建议迁移到完成未来该系统提供类似的处理功能。 |
使用场景
在以下情景中,调用线程立即返回CompletableFuture<发票>当下游流量响应网关(带有发票目的)。
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />
在以下情景中,调用线程返回时CompletableFuture<发票>当下游流将其作为网关回复的有效载荷时。
当发票准备好时,必须完成其他流程。
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
async-executor="" />
在以下情景中,调用线程返回时CompletableFuture<发票>当下游流将其作为网关回复的有效载荷时。
当发票准备好时,必须完成其他流程。
如果调试启用日志时,会发出日志条目,表示异步执行器在此场景下无法使用。
MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />
完成未来实例可用于对回复进行额外作,如下示例所示:
CompletableFuture<String> process(String data);
...
CompletableFuture result = process("foo")
.thenApply(t -> t.toUpperCase());
...
String out = result.get(10, TimeUnit.SECONDS);
反应器单
从5.0版本开始,GatewayProxyFactoryBean允许通过网关接口方法使用Project Reactor,使用单核细胞增<症>返回类型。
内部异步调用任务被包裹在Mono.fromCallable().
一个单可以用来稍后检索结果(类似于未来<?>),或者你可以通过调用调度器调用消费者当结果返回到网关时。
这单不会被框架立即冲刷。
因此,底层消息流不会在网关方法返回前启动(如同未来<?> 执行者任务)。
流动开始于单订阅。
或者,单(成为“可组合体”)可能是反应堆流的一部分,当订阅()与整个通量.
以下示例展示了如何使用Project Reactor创建网关: |
@MessagingGateway
public interface TestGateway {
@Gateway(requestChannel = "multiplyChannel")
Mono<Integer> multiply(Integer value);
}
@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
return value * 2;
}
其中此类网关可用于处理通量数据:
@Autowired
TestGateway testGateway;
public void hadnleFlux() {
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(System.out::println);
}
另一个使用 Project Reactor 的例子是一个简单的回调场景,如下示例所示:
Mono<Invoice> mono = service.process(myOrder);
mono.subscribe(invoice -> handleInvoice(invoice));
呼叫线程继续,且handleInvoice()当流动完成时被召唤。
更多信息请参见 Kotlin 协程。
下游流回流异步类型
如同异步任务执行器上面提到,如果你希望下游组件返回带有异步有效载荷的消息(前途,单以及其他),你必须显式地将异步执行器设置为零(或在使用 XML 配置时)。
随后调用线程调用该流程,结果可以稍后检索。""
异步无效返回类型
消息网关方法可以声明如下:
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "sendAsyncChannel")
@Async
void sendAsync(String payload);
}
但下游例外不会被传回呼叫者。
为确保下游流调用和异常传播的异步行为,从6.0版本开始,框架支持以下未来<虚空>和单<虚空>返回类型。
其使用场景类似于之前描述的普通发送后遗忘行为无效返回类型,但不同的是,流执行是异步进行并返回的前途(或单)是完备的,带有零或根据发送行动结果。
如果未来<虚空>是精确的下游流回应,然后asyncExecutor网关的选项必须设置为空(注释常数.NULL对于@MessagingGateway配置)以及发送部分在生产线程中执行。
回复方案取决于下游流的配置。
这样,目标应用程序就能生成未来<虚空>正确回复。
这单用例已经超出框架线程控制范围,所以设置asyncExecutor用“null”(无效)是没有意义的。
那里单<虚空>由于请求-回复,网关作必须配置为单核细胞增多症<?>网关方法的返回类型。 |
无响应时网关行为
如前所述,网关通过POJO方法调用提供了与消息系统交互的便捷方式。 然而,典型的方法调用通常预期总是返回(即使有例外),但可能并不总是一一映射到消息交换(例如,回复消息可能未到达——相当于方法未返回)。
本节其余部分将涵盖各种场景以及如何让门户更可预测地表现。
某些属性可以配置以使同步网关行为更可预测,但其中一些可能并不总是如你所预期的那样工作。
其中一个是回复-超时(在方法层面或默认-回复-超时在门户层面)。
我们检查回复-超时属性,以观察它在不同场景下如何影响同步网关的行为。
我们考察单线程场景(所有下游组件通过直接通道连接)和多线程场景(例如,在下游某处可能有可轮询或执行通道,突破了单线程边界)。
下游的长期运行过程
- 同步网关,单线程
-
如果下游组件仍在运行(可能是因为无限循环或服务缓慢),则设置
回复-超时没有影响,网关方法调用直到下游服务退出(通过返回或抛出异常)才返回。 - 同步网关,多线程
-
如果下游组件仍在运行(可能是因为无限循环或服务缓慢),在多线程消息流中,设置
回复-超时通过允许在超时后返回网关方法调用,因为GatewayProxyFactoryBean在回复频道进行投票,等待消息,直到超时结束。 然而,如果超时时间在实际回复产生前就已到达,网关方法可能会返回“空”。 你应该明白,回复消息(如果生成)是在网关方法调用返回后才发送到回复通道的,因此你必须意识到这一点,并以此为前提设计流程。
另见errorOnTimeout用 Property 来MessageTimeoutException而不是回去零,当发生超时时。
下游组件返回“空”
- 同步网关 — 单线程
-
如果下游组件返回“null”且
回复-超时被配置为负值,网关方法调用将无限期挂起,除非需要回复属性已在下游组件(例如服务激活器)上设置,可能返回“空”。 在这种情况下,会抛出一个异常并传播到网关。 - 同步网关——多线程
-
行为和之前的情况一样。
下游组件返回签名为“无效”,而网关方法签名为非无效
- 同步网关 — 单线程
-
如果下游的某个分量返回“void”,并且
回复-超时被配置为负值,网关方法调用会无限期挂起。 - 同步网关——多线程
-
行为和之前的情况一样。
下游组件导致运行时异常
- 同步网关 — 单线程
-
如果下游组件抛出运行时异常,该异常会通过错误消息传播回网关并重新抛出。
- 同步网关——多线程
-
行为和之前的情况一样。
你应该明白,默认情况下,回复-超时是无界的。
因此,如果你设置回复-超时负值时,你的网关方法调用可能会无限期卡住。
因此,为了确保你分析你的流程,并且如果哪怕有哪怕一点点可能发生这些情况,你应该设置回复-超时赋予一个“安全”值。
是的30默认是秒。
更好的是,你可以设置需要回复将下游组件属性设置为“true”,以确保响应及时,因为一旦下游组件内部返回空,则会立即抛出异常。
不过,你也应该意识到,有些情况(见第一个例子)是这样的回复-超时没用。
这意味着分析你的消息流,决定何时使用同步网关而非异步网关也很重要。
如前所述,后一种情况是定义返回的网关方法的问题前途实例。
这样你就能保证收到返回值,并且对调用结果有更细致的控制。
另外,处理路由器时,你应该记住设置分辨率要求如果 Roots 为 “true”,路由器如果无法解析某个特定信道,会抛出异常。
同样,处理过滤器时,你可以设置拒绝时抛出异常属性。
在这两种情况下,最终的流表现得像包含一个带有“requires-reply”属性的服务激活器。
换句话说,它有助于确保网关方法调用的及时响应。 |
| 你应该明白,计时器从线程返回网关时开始——也就是流量完成或消息被传递给另一个线程。 此时,呼叫线程开始等待回复。 如果流程完全同步,回复即可立即获得。 对于异步流,线程会等待至此时间。 |
从6.2版本开始,errorOnTimeout内部的性质MethodInvocationGateway扩展MessagingGatewaySupport在@MessagingGateway和GatewayEndpointSpec.
该选项的含义与终端摘要章节末尾所述的任何入站网关完全相同。
换句话说,将此选项设置为true,将导致MessageTimeoutException被从发送和接收网关作中跳出,而不是返回零当接收超时耗尽时。
看集成流程作为Gateway时期在Java DSL章节中,关于定义网关的选项集成流程.