如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

流支持

在许多情况下,应用程序数据是从流中获取的。 不建议将流的引用作为消息负载发送给消费者。 相反,消息应通过从输入流读取的数据来创建,并且消息负载应逐个写入输出流。spring-doc.cadn.net.cn

您需要将以下依赖项包含到您的项目中:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
    <version>6.4.10</version>
</dependency>
compile "org.springframework.integration:spring-integration-stream:6.4.10"

从流中读取

Spring Integration 为流提供了两个适配器。 ByteStreamReadingMessageSourceCharacterStreamReadingMessageSource 都实现了 MessageSource。 通过在 channel-adapter 元素中配置其中一个,可以配置轮询周期,消息总线也可以自动检测并调度它们。 字节流版本需要一个 InputStream,字符流版本需要一个 Reader 作为单个构造函数参数。 ByteStreamReadingMessageSource 还接受 'bytesPerMessage' 属性,用于确定它尝试将多少字节读入每个 Message。 默认值为 1024。 以下示例创建一个输入流,该流创建的每条消息包含 2048 个字节:spring-doc.cadn.net.cn

<bean class="org.springframework.integration.stream.ByteStreamReadingMessageSource">
  <constructor-arg ref="someInputStream"/>
  <property name="bytesPerMessage" value="2048"/>
</bean>

<bean class="org.springframework.integration.stream.CharacterStreamReadingMessageSource">
  <constructor-arg ref="someReader"/>
</bean>

The CharacterStreamReadingMessageSource将阅读器包装在BufferedReader中(如果它还不是的话)。 您可以在第二个构造函数参数中设置缓冲阅读器使用的缓冲区大小。 从5.0版本开始,第三个构造函数参数(blockToDetectEOF)控制CharacterStreamReadingMessageSource的行为。 当false(默认值)时,receive()方法检查阅读器是否ready(),如果不是则返回null。 在这种情况下不会检测到EOF(文件结束)。 当true时,receive()方法会阻塞直到数据可用或在底层流上检测到EOF。 当检测到EOF时,将发布一个StreamClosedEvent(应用程序事件)。 您可以通过实现ApplicationListener<StreamClosedEvent>的bean来消费此事件。spring-doc.cadn.net.cn

为了便于检测 EOF,轮询线程在 receive() 方法中阻塞,直到有数据到达或检测到 EOF。
一旦检测到 EOF,轮询器将继续在每个轮询中发布事件。 应用程序监听器可以停止适配器以防止这种情况发生。 该事件在轮询器线程上发布。 停止适配器会导致线程被中断。 如果您计划在停止适配器后执行某些可中断的任务,则必须在不同的线程上执行 stop(),或者为这些下游活动使用不同的线程。 请注意,向 QueueChannel 发送消息是可中断的,因此,如果您希望从监听器发送消息,请在停止适配器之前执行。

这有助于将数据“管道”或重定向到 stdin,如下两个示例所示:spring-doc.cadn.net.cn

cat myfile.txt | java -jar my.jar
java -jar my.jar < foo.txt

这种方法允许应用程序在管道关闭时停止。spring-doc.cadn.net.cn

提供了四个便捷的工厂方法:spring-doc.cadn.net.cn

public static final CharacterStreamReadingMessageSource stdin() { ... }

public static final CharacterStreamReadingMessageSource stdin(String charsetName) { ... }

public static final CharacterStreamReadingMessageSource stdinPipe() { ... }

public static final CharacterStreamReadingMessageSource stdinPipe(String charsetName) { ... }

写入流

对于目标流,您可以使用两种实现中的任意一种:ByteStreamWritingMessageHandlerCharacterStreamWritingMessageHandler。 每种实现都需要一个构造函数参数(OutputStream 用于字节流,或 Writer 用于字符流),并且每种都提供了一个添加可选'bufferSize'的第二个构造函数。 由于这两种实现最终都实现了 MessageHandler 接口,因此您可以像 通道适配器 中所述的那样,在 channel-adapter 配置中引用它们。spring-doc.cadn.net.cn

<bean class="org.springframework.integration.stream.ByteStreamWritingMessageHandler">
  <constructor-arg ref="someOutputStream"/>
  <constructor-arg value="1024"/>
</bean>

<bean class="org.springframework.integration.stream.CharacterStreamWritingMessageHandler">
  <constructor-arg ref="someWriter"/>
</bean>

流命名空间支持

Spring Integration 定义了一个命名空间,以减少与流相关的通道适配器所需的配置。 使用它需要以下架构位置:spring-doc.cadn.net.cn

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:beans="http://www.springframework.org/schema/beans"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
      https://www.springframework.org/schema/beans/spring-beans.xsd
      http://www.springframework.org/schema/integration/stream
      https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">

以下代码片段展示了支持配置入站通道适配器的不同配置选项:spring-doc.cadn.net.cn

<int-stream:stdin-channel-adapter id="adapterWithDefaultCharset"/>

<int-stream:stdin-channel-adapter id="adapterWithProvidedCharset" charset="UTF-8"/>

从版本 5.0 开始,您可以设置 detect-eof 属性,该属性会设置 blockToDetectEOF 属性。 有关更多信息,请参阅 从流中读取spring-doc.cadn.net.cn

要配置出站通道适配器,您也可以使用命名空间支持。 以下示例展示了出站通道适配器的不同配置方式:spring-doc.cadn.net.cn

<int-stream:stdout-channel-adapter id="stdoutAdapterWithDefaultCharset"
    channel="testChannel"/>

<int-stream:stdout-channel-adapter id="stdoutAdapterWithProvidedCharset" charset="UTF-8"
    channel="testChannel"/>

<int-stream:stderr-channel-adapter id="stderrAdapter" channel="testChannel"/>

<int-stream:stdout-channel-adapter id="newlineAdapter" append-newline="true"
    channel="testChannel"/>