如需使用最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

Kotlin 支持

该框架也已得到改进,以支持用于函数的 Kotlin Lambda,因此现在您可以结合使用 Kotlin 语言和 Spring Integration 流定义:spring-doc.cadn.net.cn

@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
    return { it.toUpperCase() }
}

@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
    return { print(it) }
}

@Bean
@InboundChannelAdapter(value = "counterChannel",
        poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
fun kotlinSupplier(): () -> String {
    return { "baz" }
}

Kotlin 协程

从 6.0 版本开始,Spring Integration 支持 Kotlin 协程。 现在可以在服务方法中使用 suspend 函数以及 kotlinx.coroutines.Deferred & kotlinx.coroutines.flow.Flow 返回类型:spring-doc.cadn.net.cn

@ServiceActivator(inputChannel = "suspendServiceChannel", outputChannel = "resultChannel")
suspend fun suspendServiceFunction(payload: String) = payload.uppercase()

@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel", async = "true")
fun flowServiceFunction(payload: String) =
    flow {
        for (i in 1..3) {
            emit("$payload #$i")
        }
    }

该框架将它们视为响应式流(Reactive Streams)交互,并使用 ReactiveAdapterRegistry 转换为相应的 MonoFlux 反应类型。 此类函数回复随后在回复通道中进行处理:如果它是 ReactiveStreamsSubscribableChannel,则作为 CompletableFuture 的结果在各自的回调中处理。spring-doc.cadn.net.cn

@ServiceActivator 上,返回值为 Flow 的函数默认不是 async,因此会生成 Flow 实例作为回复消息的有效载荷。 目标应用程序有责任将该对象作为协程进行处理,或将其分别转换为 Flux

The @MessagingGateway 接口方法在 Kotlin 中声明时也可以使用 suspend 修饰符进行标记。 框架内部利用 Mono 通过下游流执行请求 - 回复操作。 此类 Mono 结果由 MonoKt.awaitSingleOrNull() API 内部处理,以满足网关调用的 suspend 函数的 kotlin.coroutines.Continuation 参数要求:spring-doc.cadn.net.cn

@MessagingGateway(defaultRequestChannel = "suspendRequestChannel")
interface SuspendFunGateway {

    suspend fun suspendGateway(payload: String): String

}

根据 Kotlin 语言要求,此方法必须作为协程调用:spring-doc.cadn.net.cn

@Autowired
private lateinit var suspendFunGateway: SuspendFunGateway

fun someServiceMethod() {
    runBlocking {
        val reply = suspendFunGateway.suspendGateway("test suspend gateway")
    }
}