如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

TCP 连接工厂

概述

对于 TCP,底层连接的配置通过使用连接工厂提供。 提供了两种类型的连接工厂:客户端连接工厂和服务器端连接工厂。 客户端连接工厂建立出站连接。 服务器端连接工厂监听入站连接。spring-doc.cadn.net.cn

出站通道适配器使用客户端连接工厂,但您也可以将客户端连接工厂的引用提供给入站通道适配器。 该适配器接收通过出站适配器创建的连接所收到的任何传入消息。spring-doc.cadn.net.cn

入站通道适配器或网关使用服务器连接工厂。 (实际上,没有它连接工厂无法工作)。 您也可以将服务器连接工厂的引用提供给出站适配器。 然后,您可以使用该适配器在同一连接上发送对传入消息的回复。spring-doc.cadn.net.cn

回复消息仅当包含由连接工厂插入到原始消息中的 ip_connectionId 标头时,才会路由到该连接。
这是当入站和出站适配器共享连接工厂时执行的关联消息的最大范围。 这种共享允许通过 TCP 进行异步双向通信。 默认情况下,仅使用 TCP 传输负载信息。 因此,任何消息关联都必须由下游组件(如聚合器或其他端点)执行。 支持传输选定标头是在 3.0 版本中引入的。 更多信息,请参阅 TCP 消息关联

您可以将连接工厂的引用最多提供给每种类型的适配器一个。spring-doc.cadn.net.cn

Spring Integration 提供了使用 java.net.Socketjava.nio.channel.SocketChannel 的连接工厂。spring-doc.cadn.net.cn

下面的示例展示了一个简单的服务器连接工厂,它使用 java.net.Socket 个连接:spring-doc.cadn.net.cn

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"/>

下面的示例展示了一个简单的服务器连接工厂,它使用 java.nio.channel.SocketChannel 个连接:spring-doc.cadn.net.cn

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    using-nio="true"/>
从 Spring Integration 4.2 版本开始,如果服务器配置为监听随机端口(通过将端口设置为 0),则可以使用 getPort() 获取操作系统实际选择的端口。 此外,getServerSocketAddress() 允许您获取完整的 SocketAddress。 有关更多信息,请参阅 TcpServerConnectionFactory 接口的 Javadoc
<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"/>

下面的示例展示了一个客户端连接工厂,它使用 java.net.Socket 个连接,并为每条消息创建一个新的连接:spring-doc.cadn.net.cn

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"
    using-nio=true/>

从 5.2 版本开始,客户端连接工厂支持以秒为单位的 connectTimeout 属性,默认值为 60。spring-doc.cadn.net.cn

消息界定(序列化和反序列化)

TCP 是一种流式协议。 这意味着必须为通过 TCP 传输的数据提供某种结构,以便接收方能够将数据分割成离散的消息。 连接工厂被配置为使用序列化和反序列化器,以在消息负载和通过 TCP 发送的位之间进行转换。 这是通过分别为入站和出站消息提供反序列化器和序列化器来实现的。 Spring Integration 提供了多种标准序列化和反序列化器。spring-doc.cadn.net.cn

ByteArrayCrlfSerializer* 将字节数组转换为后跟回车符和换行符的字节流 (\r\n)。 这是默认的序列化器(和解序列化器),可用于(例如)作为客户端的 telnet。spring-doc.cadn.net.cn

The ByteArraySingleTerminatorSerializer* 将字节数组转换为字节流,后跟单个终止字符(默认为 0x00)。spring-doc.cadn.net.cn

The ByteArrayLfSerializer*将字节数组转换为字节流,后跟一个单行换行符 (0x0a)。spring-doc.cadn.net.cn

The ByteArrayStxEtxSerializer* 将字节数组转换为以 STX(0x02)开头、ETX(0x03)结尾的字节流。spring-doc.cadn.net.cn

The ByteArrayLengthHeaderSerializer 将字节数组转换为以网络字节序(大端)的二进制长度开头的字节流。这是一个高效的反序列化器,因为它无需解析每个字节来查找终止字符序列。它也可用于包含二进制数据的有效载荷。前面的序列化程序仅支持负载中的文本。长度头的默认大小为四个字节(一个 Integer),允许消息最大为 (2^31 - 1) 字节。然而,length 头部对于长度不超过 255 字节的消息可以是一个单字节(无符号),而对于长度不超过 (2^16 - 1) 字节的消息则可以是无符号短整型(2 字节)。如果你需要其他格式的头部,可以子类化 ByteArrayLengthHeaderSerializer 并为 readHeaderwriteHeader 方法提供实现。绝对最大数据大小为 (2^31 - 1) 字节。从版本 5 开始。2、标头值除了有效载荷外还可以包含标头的长度。将 inclusive 属性设置为启用该机制(生产者和消费者必须将其设置为相同的值)。spring-doc.cadn.net.cn

The ByteArrayRawSerializer*将字节数组转换为字节流,且不添加任何额外的消息定界数据。使用此序列化器(和解序列化器),消息的结束由客户端以有序方式关闭套接字来指示。当使用此序列化器时,消息接收将挂起,直到客户端关闭套接字或发生超时。超时不会导致消息生成。当使用该序列化器且客户端为 Spring Integration 应用程序时,客户端必须使用配置了 single-use="true" 的连接工厂。这样做会导致适配器在发送消息后关闭套接字。序列化器本身不会关闭连接。您应仅在通道适配器(而非网关)所使用的连接工厂中使用此序列化器,且这些连接工厂应由入站或出站适配器之一使用,而不能同时被两者使用。另请参阅本节后面的ByteArrayElasticRawDeserializer。然而,自 5.0 版本起。2,出站网关有一个新属性 closeStreamAfterSend;这允许使用原始序列器/反序列器,因为 EOF 信号已发送给服务器,同时保持连接打开以接收回复。spring-doc.cadn.net.cn

在4.2.2版本之前,当使用非阻塞I/O(NIO)时,该序列化器会将读取过程中的超时视为文件结束,并将已读取的数据作为一条消息发出。 这种行为不可靠,不应用于消息定界。 现在,此类情况将被视为异常处理。 如果您确实以这种方式使用了它,可以通过将treatTimeoutAsEndOfMessage构造函数参数设置为true来恢复之前的行为。

这些类都是 AbstractByteArraySerializer 的子类,同时实现了 org.springframework.core.serializer.Serializerorg.springframework.core.serializer.Deserializer。 为了向后兼容,使用任何 AbstractByteArraySerializer 子类进行序列化的连接也会接受一个首先转换为字节数组的 String。 每个序列化和反序列化器都将包含相应格式的数据流转换为字节数组负载。spring-doc.cadn.net.cn

为避免因行为不当的客户端(即不遵守配置序列化器协议的客户端)导致内存耗尽,这些序列化器会限制最大消息大小。 如果传入的消息超过此大小,将抛出异常。 默认最大消息大小为 2048 字节。 您可以通过设置 maxMessageSize 属性来增加该值。 如果您使用默认的序列化器或反序列化器并希望增加最大消息大小,则必须将最大消息大小声明为一个显式 Bean,并设置其 maxMessageSize 属性,同时配置连接工厂以使用该 Bean。spring-doc.cadn.net.cn

本节前文中标记为 * 的类会使用一个中间缓冲区,并将解码后的数据复制到正确大小的最终缓冲区中。 从 4.3 版本开始,您可以通过设置 poolSize 属性来配置这些缓冲区,使这些原始缓冲区可被复用,而不是为每条消息分配并丢弃(这是默认行为)。 将属性设置为负值会创建一个无界池。 如果池是有界的,您还可以设置 poolWaitTimeout 属性(单位为毫秒),之后如果没有可用的缓冲区则会抛出异常。 其默认值为无穷大。 此类异常会导致套接字被关闭。spring-doc.cadn.net.cn

如果您希望在自定义反序列化器中使用相同的机制,您可以扩展 AbstractPooledBufferByteArraySerializer(而不是其超类 AbstractByteArraySerializer),并实现 doDeserialize() 而不是 deserialize()。 缓冲区会自动返回到池中。 AbstractPooledBufferByteArraySerializer 还提供了一个便捷的实用方法:copyToSizedArray()spring-doc.cadn.net.cn

版本 5.0 添加了 ByteArrayElasticRawDeserializer。 这与上面的 ByteArrayRawSerializer 的反序列化侧类似,不同之处在于无需设置 maxMessageSize。 内部使用了一个 ByteArrayOutputStream,允许缓冲区根据需要增长。 客户端必须以有序方式关闭套接字以指示消息结束。spring-doc.cadn.net.cn

此反序列化器仅在对等方可信时才应使用;由于内存不足条件,它容易受到拒绝服务(DoS)攻击。

The MapJsonSerializer 使用 Jackson ObjectMapperMap 和 JSON 之间进行转换。 您可以将此序列化器与 MessageConvertingTcpMessageMapperMapMessageConverter 配合使用,以传输选定的 HTTP 头和以 JSON 格式表示的有效负载。spring-doc.cadn.net.cn

Jackson ObjectMapper 无法在流中界定消息。 因此,MapJsonSerializer 需要委托给另一个序列化器或反序列化器来处理消息界定。 默认情况下,使用 ByteArrayLfSerializer,导致在线路上生成的消息格式为 <json><LF>,但您可以配置它使用其他格式。(下一个示例展示了如何操作。)

最终的默认序列化为 org.springframework.core.serializer.DefaultSerializer,可用于使用 Java 序列化转换可序列化对象。 org.springframework.core.serializer.DefaultDeserializer 用于对包含可序列化对象的流进行入站反序列化。spring-doc.cadn.net.cn

如果您不想使用默认的序列化和反序列化器(ByteArrayCrLfSerializer),则必须在连接工厂上设置 serializerdeserializer 属性。 以下示例展示了如何操作:spring-doc.cadn.net.cn

<bean id="javaSerializer"
      class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
      class="org.springframework.core.serializer.DefaultDeserializer" />

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    deserializer="javaDeserializer"
    serializer="javaSerializer"/>

一个使用 java.net.Socket 个连接并在传输层使用 Java 序列化的服务器连接工厂。spring-doc.cadn.net.cn

关于连接工厂上可用属性的完整详细信息,请参阅本节末尾的参考文档spring-doc.cadn.net.cn

默认情况下,不会对入站数据包执行反向 DNS 查找:在 DNS 未配置的环境中(例如 Docker 容器),这可能会导致连接延迟。 若要将 IP 地址转换为用于消息头的主机名,可通过将 lookup-host 属性设置为 true 来覆盖默认行为。spring-doc.cadn.net.cn

您还可以修改套接字和套接字工厂的属性。 有关更多信息,请参阅SSL/TLS 支持。 正如那里所注意到的,无论是否使用 SSL,都可以进行此类修改。

主机验证

从 5.1.0 版本开始,主机验证默认启用以增强安全性。 此功能确保在 TCP 连接期间验证服务器的身份。spring-doc.cadn.net.cn

如果您遇到需要禁用主机验证的场景(不推荐),您可以在 tcp-connection-factory 中配置 socket-support 属性。spring-doc.cadn.net.cn

<int-ip:tcp-connection-factory id="client"
                                type="client"
                                host="localhost"
                                port="0"
                                socket-support="customSocketSupport"
                                single-use="true"
                                so-timeout="10000"/>

<bean id="customSocketSupport" class="org.springframework.integration.ip.tcp.connection.DefaultTcpSocketSupport">
	<constructor-arg value="false" />
</bean>

自定义序列化器和反序列化器

如果您的数据不是由标准反序列化器支持的格式,您可以实现自己的反序列化器;您还可以实现自定义的序列化器。spring-doc.cadn.net.cn

要实现自定义的序列化和反序列化对,请实现 org.springframework.core.serializer.Deserializerorg.springframework.core.serializer.Serializer 接口。spring-doc.cadn.net.cn

当反序列化器在消息之间检测到输入流已关闭时,必须抛出 SoftEndOfStreamException;这是向框架发出的信号,表明该关闭是“正常”的。 如果在解码消息时流被关闭,则应抛出其他异常。spring-doc.cadn.net.cn

从 5.2 版本开始,SoftEndOfStreamException 现在是一个 RuntimeException,而不是扩展 IOExceptionspring-doc.cadn.net.cn

TCP 缓存客户端连接工厂

正如前文所述,TCP 套接字可以是‘单次使用’的(一个请求或响应)或共享的。 在高容量环境中,共享套接字与出站网关配合不佳,因为套接字一次只能处理一个请求或响应。spring-doc.cadn.net.cn

为了提高性能,您可以使用协作通道适配器代替网关,但这需要应用级别的消息关联。 有关更多信息,请参阅 TCP 消息关联spring-doc.cadn.net.cn

Spring Integration 2.2 引入了一个缓存客户端连接工厂,该工厂使用共享套接字池,使网关能够通过共享连接池处理多个并发请求。spring-doc.cadn.net.cn

TCP 故障转移客户端连接工厂

您可以配置一个 TCP 连接工厂,该工厂支持故障转移至一个或多个其他服务器。 发送消息时,该工厂会遍历其所有配置的工厂,直到消息成功发送或找不到可用连接为止。 初始状态下,将使用配置列表中的第一个工厂。 如果后续连接失败,则下一个工厂将成为当前工厂。 以下示例展示了如何配置故障转移客户端连接工厂:spring-doc.cadn.net.cn

<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
    <constructor-arg>
        <list>
            <ref bean="clientFactory1"/>
            <ref bean="clientFactory2"/>
        </list>
    </constructor-arg>
</bean>
当使用故障转移连接工厂时,singleUse 属性必须在工厂本身及其配置使用的工厂列表之间保持一致。

连接工厂有两个与故障转移相关的属性,当与共享连接(singleUse=false)一起使用时:spring-doc.cadn.net.cn

考虑以下基于上述配置的场景: 假设 clientFactory1 无法建立连接,但 clientFactory2 可以。 当 refreshSharedInterval 过期后调用 failCF getConnection() 方法时,我们将再次尝试使用 clientFactory1 进行连接;如果成功,则关闭到 clientFactory2 的连接。 如果 closeOnRefreshfalse,则“旧”连接将保持打开状态,并在首次工厂再次失败时可能被重用。spring-doc.cadn.net.cn

设置 refreshSharedInterval 表示仅在该时间过期后尝试使用第一个工厂重新连接;设置为 Long.MAX_VALUE(默认值)表示仅在当前连接失败时才回退到第一个工厂。spring-doc.cadn.net.cn

设置 closeOnRefresh 可在刷新实际创建新连接后关闭“旧”连接。spring-doc.cadn.net.cn

如果任何委托工厂是 CachingClientConnectionFactory,则这些属性不适用,因为连接缓存由该处处理;在这种情况下,将始终通过连接工厂列表来获取连接。

从版本 5.3 开始,这些默认值分别为 Long.MAX_VALUEtrue,因此工厂仅在当前连接失败时才尝试回退。 若要恢复为之前版本的默认行为,请将它们设置为 0falsespring-doc.cadn.net.cn

TCP 线程亲和连接工厂

Spring Integration 5.0 版本引入了此连接工厂。 它将连接绑定到调用线程,并且该线程每次发送消息时都会重用同一个连接。 只要连接未被关闭(由服务器或网络关闭),或者线程未调用 releaseConnection() 方法,这种重用就会持续进行。 连接本身由另一个客户端工厂实现提供,该实现必须配置为提供非共享(单次使用)的连接,以确保每个线程都能获得一个独立的连接。spring-doc.cadn.net.cn

以下示例展示了如何配置 TCP 线程亲和性连接工厂:spring-doc.cadn.net.cn

@Bean
public TcpNetClientConnectionFactory cf() {
    TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
            Integer.parseInt(System.getProperty(PORT)));
    cf.setSingleUse(true);
    return cf;
}

@Bean
public ThreadAffinityClientConnectionFactory tacf() {
    return new ThreadAffinityClientConnectionFactory(cf());
}

@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
    TcpOutboundGateway outGate = new TcpOutboundGateway();
    outGate.setConnectionFactory(tacf());
    outGate.setReplyChannelName("toString");
    return outGate;
}