此版本仍在开发中,尚未被视为稳定版。如需最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

流支持

在许多情况下,应用程序数据是从流中获取的。 不建议将流的引用作为消息负载发送给消费者。 相反,消息应通过从输入流读取的数据来创建,并且消息负载应逐个写入输出流。spring-doc.cadn.net.cn

此依赖项是项目所必需的:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
    <version>7.1.0-SNAPSHOT</version>
</dependency>
implementation "org.springframework.integration:spring-integration-stream:7.1.0-SNAPSHOT"

从流中读取

Spring Integration 为流提供了两个适配器。 ByteStreamReadingMessageSourceCharacterStreamReadingMessageSource都实现了MessageSource。 通过在 channel-adapter 元素中配置其中一个,可以配置轮询周期,并且消息总线可以自动检测并调度它们。 字节流版本需要一个InputStream,而字符流版本需要一个Reader作为单个构造函数参数。 ByteStreamReadingMessageSource还接受'bytesPerMessage'属性,以确定它尝试将多少字节读入每个Message。 默认值为1024。 以下示例创建一个输入流,该流创建的每条消息包含 2048 个字节:spring-doc.cadn.net.cn

<bean class="org.springframework.integration.stream.inbound.ByteStreamReadingMessageSource">
  <constructor-arg ref="someInputStream"/>
  <property name="bytesPerMessage" value="2048"/>
</bean>

<bean class="org.springframework.integration.stream.inbound.CharacterStreamReadingMessageSource">
  <constructor-arg ref="someReader"/>
</bean>

The CharacterStreamReadingMessageSource 将读取器封装在 BufferedReader 中(如果它尚未是的话)。 您可以在第二个构造函数参数中设置缓冲读取器使用的缓冲区大小。 从版本 5.0 开始,第三个构造函数参数 (blockToDetectEOF) 控制 CharacterStreamReadingMessageSource 的行为。 当为 false(默认值)时,receive() 方法会检查读取器是否 ready(),如果不是则返回 null。 在这种情况下不会检测到 EOF(文件结束)。 当设置为 true 时,receive() 方法会阻塞直到有数据可用或在底层流上检测到 EOF。 当检测到 EOF 时,将发布一个 StreamClosedEvent(应用程序事件)。 您可以通过实现 ApplicationListener<StreamClosedEvent> 的 Bean 来消费此事件。spring-doc.cadn.net.cn

为了便于检测 EOF,轮询线程在 receive() 方法中阻塞,直到有数据到达或检测到 EOF。
一旦检测到 EOF,轮询器将继续在每个轮询中发布事件。 应用程序监听器可以停止适配器以防止这种情况发生。 该事件在轮询器线程上发布。 停止适配器会导致线程被中断。 如果您计划在停止适配器后执行某些可中断的任务,则必须在不同的线程上执行 stop(),或者为这些下游活动使用不同的线程。 请注意,向 QueueChannel 发送消息是可中断的,因此,如果您希望从监听器发送消息,请在停止适配器之前执行。

这有助于“管道”或将数据重定向到stdin,如下两个示例所示:spring-doc.cadn.net.cn

cat myfile.txt | java -jar my.jar
java -jar my.jar < foo.txt

这种方法允许应用程序在管道关闭时停止。spring-doc.cadn.net.cn

提供了四个便捷的工厂方法:spring-doc.cadn.net.cn

public static final CharacterStreamReadingMessageSource stdin() { ... }

public static final CharacterStreamReadingMessageSource stdin(String charsetName) { ... }

public static final CharacterStreamReadingMessageSource stdinPipe() { ... }

public static final CharacterStreamReadingMessageSource stdinPipe(String charsetName) { ... }

写入流

对于目标流,您可以使用两种实现中的任意一种:ByteStreamWritingMessageHandlerCharacterStreamWritingMessageHandler。 每种实现都需要一个构造函数参数(OutputStream 用于字节流,或 Writer 用于字符流),并且每种都提供了一个添加可选'bufferSize'的第二个构造函数。 由于这两种实现最终都实现了 MessageHandler 接口,因此您可以像 通道适配器 中所述的那样,在 channel-adapter 配置中引用它们。spring-doc.cadn.net.cn

<bean class="org.springframework.integration.stream.outbound.ByteStreamWritingMessageHandler">
  <constructor-arg ref="someOutputStream"/>
  <constructor-arg value="1024"/>
</bean>

<bean class="org.springframework.integration.stream.outbound.CharacterStreamWritingMessageHandler">
  <constructor-arg ref="someWriter"/>
</bean>

流命名空间支持

Spring Integration 定义了一个命名空间,以减少与流相关的通道适配器所需的配置。 使用它需要以下架构位置:spring-doc.cadn.net.cn

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:beans="http://www.springframework.org/schema/beans"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
      https://www.springframework.org/schema/beans/spring-beans.xsd
      http://www.springframework.org/schema/integration/stream
      https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">

以下代码片段展示了支持配置入站通道适配器的不同配置选项:spring-doc.cadn.net.cn

<int-stream:stdin-channel-adapter id="adapterWithDefaultCharset"/>

<int-stream:stdin-channel-adapter id="adapterWithProvidedCharset" charset="UTF-8"/>

从版本 5.0 开始,您可以设置 detect-eof 属性,该属性会设置 blockToDetectEOF 属性。 有关更多信息,请参阅 从流中读取spring-doc.cadn.net.cn

要配置出站通道适配器,您也可以使用命名空间支持。 以下示例展示了出站通道适配器的不同配置方式:spring-doc.cadn.net.cn

<int-stream:stdout-channel-adapter id="stdoutAdapterWithDefaultCharset"
    channel="testChannel"/>

<int-stream:stdout-channel-adapter id="stdoutAdapterWithProvidedCharset" charset="UTF-8"
    channel="testChannel"/>

<int-stream:stderr-channel-adapter id="stderrAdapter" channel="testChannel"/>

<int-stream:stdout-channel-adapter id="newlineAdapter" append-newline="true"
    channel="testChannel"/>