|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
零消息支持
Spring Integration 提供了支持应用程序中 ZeroMQ 通信的组件。 该实现基于 JeroMQ 库广受支持的 Java API。 所有组件都封装了 ZeroMQ 套接字的生命周期,并在内部为其管理线程,从而使得与这些组件的交互无锁且线程安全。
您需要将以下依赖项包含到您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-zeromq</artifactId>
<version>6.4.10</version>
</dependency>
compile "org.springframework.integration:spring-integration-zeromq:6.4.10"
ZeroMQ 代理
ZeroMqProxy 是一个面向 Spring 的内置 ZMQ.proxy() 函数 的包装器。
它封装了套接字的生命周期和线程管理。
该代理的客户端仍然可以使用标准的 ZeroMQ 套接字连接和交互 API。
除了标准的 ZContext 之外,它还需要使用一种众所周知的 ZeroMQ 代理模式:SUB/PUB、PULL/PUSH 或 ROUTER/DEALER。
这样,就可以为代理的前端和后端使用适当配对的 ZeroMQ 套接字类型。
详情参见 ZeroMqProxy.Type。
The ZeroMqProxy 实现 SmartLifecycle 以创建、绑定和配置套接字,并从 Executor(如果存在)在专用线程中启动 ZMQ.proxy()。
前端和后端套接字的绑定通过 tcp:// 协议在所有可用的网络接口上完成,并使用提供的端口。
否则,它们将绑定到随机端口,这些端口稍后可通过相应的 getFrontendPort() 和 getBackendPort() API 方法获取。
控制套接字作为具有线程间传输的 SocketType.PAIR 在 "inproc://" + beanName + ".control" 地址上暴露;可通过 getControlAddress() 获取它。
它应与来自另一个 SocketType.PAIR 套接字的同一应用程序配合使用,以发送 ZMQ.PROXY_TERMINATE、ZMQ.PROXY_PAUSE 和/或 ZMQ.PROXY_RESUME 命令。
当调用 stop() 时,ZeroMqProxy 会执行 ZMQ.PROXY_TERMINATE 命令,以终止其生命周期中的 ZMQ.proxy() 循环并优雅地关闭所有绑定的套接字。
setExposeCaptureSocket(boolean) 选项会使此组件绑定一个额外的线程间套接字,该套接字为 SocketType.PUB,用于捕获并发布前端与后端套接字之间的所有通信,正如 ZMQ.proxy() 实现中所陈述的那样。
此套接字绑定到 "inproc://" + beanName + ".capture" 地址,且不期望任何用于过滤的特定订阅。
前端和后端套接字可以通过附加属性进行自定义,例如读/写超时或安全性。
这种自定义功能分别通过 setFrontendSocketConfigurer(Consumer<ZMQ.Socket>) 和 setBackendSocketConfigurer(Consumer<ZMQ.Socket>) 回调提供。
The ZeroMqProxy 可以像这样作为一个简单的 bean 提供:
@Bean
ZeroMqProxy zeroMqProxy() {
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(6001);
proxy.setBackendPort(6002);
return proxy;
}
所有客户端节点应通过tcp://连接到此代理的主机,并使用它们各自感兴趣的相应端口。
ZeroMQ 消息通道
The ZeroMqChannel 是一个使用一对 ZeroMQ 套接字连接发布者和订阅者以实现消息交互的 SubscribableChannel。
它可以在 PUB/SUB 模式下工作(默认为 PUSH/PULL);也可以用作本地线程间通道(使用 PAIR 个套接字),此时不提供 connectUrl。
在分布式模式下,它必须连接到由外部管理的 ZeroMQ 代理,以便与连接到同一代理的其他类似通道交换消息。
连接 URL 选项是标准的 ZeroMQ 连接字符串,包含协议、主机以及用冒号分隔的前端和后端套接字的端口对。
为方便起见,如果该通道与代理配置在同一个应用程序中,可以使用 ZeroMqProxy 实例代替连接字符串来提供该通道。
发送和接收套接字都在各自专用的线程中管理,使该通道具有并发友好性。
这样我们就可以在不同的线程之间发布和消费来自/去向 ZeroMqChannel 的数据,而无需同步。
默认情况下,ZeroMqChannel 使用 EmbeddedJsonHeadersMessageMapper 通过 Jackson JSON 处理器将 Message(包括请求头)从/到 byte[] 进行(反)序列化。
此逻辑可通过 setMessageMapper(BytesMessageMapper) 进行配置。
发送和接收套接字可以通过相应的 setSendSocketConfigurer(Consumer<ZMQ.Socket>) 和 setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>) 回调函数,为任意选项(如读取/写入超时、安全性等)进行自定义。
ZeroMqChannel的内部逻辑基于通过Project Reactor的响应式流以及Flux和Mono操作符实现。
这提供了更简便的线程控制,并允许对通道的发布和消费进行无锁并发处理。
本地PUB/SUB逻辑被实现为Flux.publish()操作符,以允许该通道的所有本地订阅者接收相同的已发布消息,就像它们是PUB套接字的分布式订阅者一样。
以下是 ZeroMqChannel 配置的一个简单示例:
@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://localhost:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
return channel;
}
ZeroMQ 入站通道适配器
The ZeroMqMessageProducer 是一个具有响应式语义的 MessageProducerSupport 实现。
它以非阻塞方式不断从 ZeroMQ 套接字读取数据,并将消息发布到一个无限的 Flux,该通道可由 FluxMessageChannel 订阅,或者在输出通道不具有响应性时通过 start() 方法显式订阅。
当套接字上未接收到数据时,将应用一个 consumeDelay(默认为 1 秒),然后才进行下一次读取尝试。
只有 SocketType.PAIR、SocketType.PULL 和 SocketType.SUB 被 ZeroMqMessageProducer 所支持。
此组件可以连接到远程套接字,或使用提供的或随机端口绑定到 TCP 协议。
实际端口可以在该组件启动且 ZeroMQ 套接字绑定后,通过 getBoundPort() 获取。
套接字选项(例如安全性或写入超时)可以通过 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回调进行配置。
如果将 receiveRaw 选项设置为 true,则从套接字消费的 ZMsg 将作为生成的 Message 的有效载荷原样发送:解析和转换 ZMsg 的工作由下游流程负责。
否则,使用 InboundMessageMapper 将消费的数据转换为 Message。
如果接收到的 ZMsg 是多帧的,则第一帧被视为该 ZeroMQ 消息发布的 ZeroMqHeaders.TOPIC 头部。
如果将 unwrapTopic 选项设置为 false,则传入的消息被视为由两个帧组成:主题和 ZeroMQ 消息。
否则,默认情况下,ZMsg 被视为由三个帧组成:第一个帧包含主题,最后一个帧包含消息,中间是一个空帧。
使用 SocketType.SUB,ZeroMqMessageProducer 将使用提供的 topics 选项进行订阅;默认订阅所有。
可以在运行时通过 subscribeToTopics() 和 unsubscribeFromTopics() @ManagedOperations 调整订阅。
以下是 ZeroMqMessageProducer 配置的一个示例:
@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
messageProducer.setOutputChannel(outputChannel);
messageProducer.setTopics("some");
messageProducer.setReceiveRaw(true);
messageProducer.setBindPort(7070);
messageProducer.setConsumeDelay(Duration.ofMillis(100));
return messageProducer;
}
ZeroMQ 出站通道适配器
The ZeroMqMessageHandler 是一个 ReactiveMessageHandler 实现,用于将发布消息发布到 ZeroMQ 套接字。
仅支持 SocketType.PAIR、SocketType.PUSH 和 SocketType.PUB。
此组件可以使用提供的端口或随机端口连接到远程套接字或通过 TCP 协议进行绑定。
实际端口可以在该组件启动且 ZeroMQ 套接字绑定后通过 getBoundPort() 获取。
当使用SocketType.PUB时,topicExpression会针对请求消息进行求值,如果该值为非空,则将主题帧注入到 ZeroMQ 消息中。
订阅方(SocketType.SUB)必须在解析实际数据之前先接收主题帧。
如果将 wrapTopic 选项设置为 false,则 ZeroMQ 消息帧将在注入的主题(topic)之后发送(如果存在该主题)。
默认情况下,在主题和消息之间会发送一个额外的空帧。
当请求消息的负载为 ZMsg 时,不执行任何转换或主题提取:ZMsg 将原样发送到套接字,且不会被销毁以用于可能的进一步复用。
否则,将使用 OutboundMessageMapper<byte[]> 将请求消息(或其负载)转换为 ZeroMQ 帧进行发布。
默认情况下,会使用一个由 ConfigurableCompositeMessageConverter 提供的 ConvertingBytesMessageMapper。
套接字选项(例如安全性或写入超时)可通过 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回调进行配置。
以下是连接套接字的 ZeroMqMessageHandler 配置示例:
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
以下是一个绑定到所提供端口的 ZeroMqMessageHandler 配置示例:
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, 7070, SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
ZeroMQ Java DSL 支持
The spring-integration-zeromq 通过 ZeroMq 工厂和 IntegrationComponentSpec 实现,为上述组件提供了一个便捷的 Java DSL 流畅 API。
这是 Java DSL 的示例,用于 ZeroMqChannel:
.channel(ZeroMq.zeroMqChannel(this.context)
.connectUrl("tcp://localhost:6001:6002")
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的入站通道适配器是:
IntegrationFlow.from(
ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
.connectUrl("tcp://localhost:9000")
.topics("someTopic")
.receiveRaw(true)
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的出站通道适配器为:
.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
.topicFunction(message -> message.getHeaders().get("myTopic")))
}