如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

关于非阻塞 I/O (NIO)

使用 NIO(参见 using-nio 中的 IP 配置属性)可避免为每个套接字分配一个线程进行读取。 对于少量套接字,您很可能会发现,不使用 NIO 并结合异步交接(例如交接至 QueueChannel),其性能表现与使用 NIO 相当甚至更好。spring-doc.cadn.net.cn

在处理大量连接时,您应该考虑使用 NIO。 然而,使用 NIO 会带来一些其他影响。 线程池(在任务执行器中)在所有套接字之间共享。 每个传入消息都会被组装,并作为独立的工作单元发送到配置的通道,该工作由从池中选择的线程处理。 同一套接字上连续到达的两个消息可能会由不同的线程处理。 这意味着发送到通道的消息顺序是不确定的。 不保证套接字上到达的消息的严格顺序。spring-doc.cadn.net.cn

对于某些应用程序,这不是问题。 对于其他应用程序,这是一个问题。 如果您需要严格的顺序,请考虑将 using-nio 设置为 false 并使用异步交接。spring-doc.cadn.net.cn

或者,您可以在入站端点之后插入一个重排序器,将消息恢复为正确的顺序。 如果在连接工厂上将 apply-sequence 设置为 true,则到达 TCP 连接的消息将设置 sequenceNumbercorrelationId 标头。 重排序器使用这些标头将消息恢复为正确的顺序。spring-doc.cadn.net.cn

从 5.1.4 版本开始,优先接受新连接而非读取现有连接。 通常情况下,除非您有非常高的新入站连接速率,否则这应该几乎没有影响。 如果您希望恢复到之前优先读取的行为,请将 multiAccept 属性设置为 false(位于 TcpNioServerConnectionFactory 上)。

连接池大小

池大小属性不再被使用。 此前,它指定了未指定任务执行器时的默认线程池大小。 它也用于设置服务器套接字的连接积压数。 第一个功能已不再需要(见下一段)。 第二个功能由 backlog 属性替代。spring-doc.cadn.net.cn

此前,在使用固定线程池任务执行器(这是默认配置)配合 NIO 时,可能会出现死锁,导致处理停止。 当缓冲区已满、从套接字读取数据的线程试图向缓冲区添加更多数据,且没有可用线程来腾出缓冲区空间时,就会发生此问题。 这种情况仅在线程池大小非常小时出现,但在极端条件下也可能发生。 自 2.2 版本起,两项更改已消除此问题。 首先,默认的任务执行器改为缓存线程池执行器。 其次,添加了死锁检测逻辑:如果发生线程饥饿,系统将不再死锁,而是抛出异常,从而释放被死锁占用的资源。spring-doc.cadn.net.cn

现在默认的任务执行器是无限制的,如果消息处理耗时较长,在高频率的消息流入情况下,可能会发生内存溢出条件。 如果您的应用程序表现出这种行为,您应该使用具有适当池大小的池化任务执行器,但请参见下一节

线程池任务执行器与CALLER_RUNSpolicy

使用固定线程池时,您应当牢记一些重要的注意事项:当 CallerRunsPolicy(在使用 <task/> 命名空间时为 CALLER_RUNS)且队列容量较小时。spring-doc.cadn.net.cn

如果您不使用固定线程池,则以下规则不适用。spring-doc.cadn.net.cn

使用 NIO 连接时,存在三种不同的任务类型。 I/O 选择器的处理由一个专用线程执行(检测事件、接受新连接,并通过任务执行器将 I/O 读操作分派给其他线程)。 当 I/O 读取线程(读操作被分派到该线程)读取数据时,它将把数据交给另一个线程来组装传入的消息。 大消息可能需要多次读取才能完成。 这些“组装”线程在等待数据时可能会阻塞。 当发生新的读取事件时,读取器会检查该套接字是否已有组装线程,如果没有,则启动一个新的组装线程。 组装过程完成后,组装线程将被返回到线程池中。spring-doc.cadn.net.cn

当连接池耗尽、使用CALLER_RUNS拒绝策略且任务队列已满时,这可能导致死锁。 当连接池为空且队列中没有空间时,IO选择器线程会收到OP_READ事件,并使用执行器分发读取操作。 由于队列已满,选择器线程本身开始启动读取过程。 此时它检测到该套接字没有对应的组装器,并在执行读取之前触发一个组装器。 再次出现队列已满的情况,选择器线程变成了组装器。 现在组装器被阻塞,等待数据被读取,但数据永远不会到来。 连接工厂因此陷入死锁,因为选择器线程无法处理新的事件。spring-doc.cadn.net.cn

为了避免这种死锁,我们必须避免选择器(或读取)线程执行组装任务。 我们希望为 IO 操作和组装操作使用独立的线程池。spring-doc.cadn.net.cn

该框架提供了一个CompositeExecutor,允许配置两个独立的执行器:一个用于执行IO操作,另一个用于消息组装。 在此环境中,IO线程永远无法成为组装线程,因此不会发生死锁。spring-doc.cadn.net.cn

此外,任务执行器应配置为使用 AbortPolicy(使用 <task> 时为 ABORT)。 当 I/O 任务无法完成时,会暂时推迟并持续重试,直到能够完成并分配一个组装器。 默认延迟时间为 100ms,但您可以通过在连接工厂上设置 readDelay 属性来更改它(在使用 XML 命名空间配置时为 read-delay)。spring-doc.cadn.net.cn

以下三个示例展示了如何配置组合执行器:spring-doc.cadn.net.cn

@Bean
private CompositeExecutor compositeExecutor() {
    ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
    ioExec.setCorePoolSize(4);
    ioExec.setMaxPoolSize(10);
    ioExec.setQueueCapacity(0);
    ioExec.setThreadNamePrefix("io-");
    ioExec.setRejectedExecutionHandler(new AbortPolicy());
    ioExec.initialize();
    ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
    assemblerExec.setCorePoolSize(4);
    assemblerExec.setMaxPoolSize(10);
    assemblerExec.setQueueCapacity(0);
    assemblerExec.setThreadNamePrefix("assembler-");
    assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
    assemblerExec.initialize();
    return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg ref="io"/>
    <constructor-arg ref="assembler"/>
</bean>

<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="io-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="8" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="assembler-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="10" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
</bean>