此版本仍在开发中,尚未被视为稳定版。如需最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

gRPC 支持

从版本 7.1 开始,Spring Integration 提供了入站和出站网关,用于通过 gRPC 协议进行通信。spring-doc.cadn.net.cn

此依赖项是项目所必需的:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-grpc</artifactId>
    <version>7.1.0-M3</version>
</dependency>
implementation "org.springframework.integration:spring-integration-grpc:7.1.0-M3"

gRPC 的 Spring Integration 组件并非从 Protocol Buffers 生成,因此不像典型的 gRPC 服务和存根实现那样具备类型安全性。 这主要是由于 Spring Integration 框架本身的通用性质:其工作单元是 Message 抽象,而该消息的有效负载类型通常超出了集成组件内部逻辑的范围。 因此,服务调用的 gRPC 消息在发送和接收时保持原样,不进行任何转换假设。 例如,如果 gRPC 服务方法如下所示:spring-doc.cadn.net.cn

service TestHelloWorld {

  // Sends a greeting
  rpc SayHello(HelloRequest) returns (HelloReply) {}

  // Sends a greeting and something else
  rpc StreamSayHello(HelloRequest) returns (stream HelloReply) {}

  // Sends a greeting to everyone present
  rpc HelloToEveryOne(stream HelloRequest) returns (HelloReply) {}

  // Streams requests and replies
  rpc BidiStreamHello(stream HelloRequest) returns (stream HelloReply) {}

}

The HelloRequest 将作为入站网关(服务器)侧的请求消息负载,并且必须出现在出站网关(客户端)侧的请求中。 因此,HelloReply 必须是入站网关的回复消息负载,并将在出站网关处被接收。spring-doc.cadn.net.cn

The GrpcHeaders 类包含在 gRPC 网关之前和之后用于消息的标头名称的便捷常量。 例如,GrpcHeaders.METHOD_TYPE 标头在服务端(入站网关)包含一个 io.grpc.MethodDescriptor.MethodType 枚举值,以便于下游路由。 另一个有用的标头是 GrpcHeaders.SERVICE_METHOD,它指示服务端调用了哪个 gRPC 服务方法,或客户端存根应调用哪个 gRPC 服务方法。spring-doc.cadn.net.cn

The GrpcHeaders.SERVICE_METHOD header on the inbound gateway has a value of the gRPC service method name exactly as it is declared in the Protobuf (see .proto example above) and how it is stored into the io.grpc.MethodDescriptor of the service definition.

gRPC 的入站网关

The GrpcInboundGateway 是一个实现类,用于接收 gRPC 请求、向下游流程发送消息并生成 gRPC 响应。 初始化时,该网关的实例仅需一个实现了 BindableService 的抽象 gRPC 服务类,该类通常由 Protobuf 生成,并附带一个 *ImplBase 类名。spring-doc.cadn.net.cn

仅支持标准 gRPC 服务:生成的 AsyncService 契约是 GrpcInboundGateway 逻辑的基础。 由于这些类型未从网关定义中暴露,因此在 Spring Integration 逻辑中使用基于 Reactor 和 Kotlin 的服务生成是没有意义的。

网关使用上述 AsyncService 接口创建代理并拦截 gRPC 服务方法。spring-doc.cadn.net.cn

以下示例演示如何配置 GrpcInboundGatewayspring-doc.cadn.net.cn

@Bean
GrpcInboundGateway helloWorldService() {
    return new GrpcInboundGateway(TestHelloWorldGrpc.TestHelloWorldImplBase.class);
}

The GrpcInboundGateway 实现了 BindableService,并基于提到的代理为 gRPC 服务的 AsyncService 契约暴露了 ServerServiceDefinition。 因此,此网关的实例必须注册到 ServerBuilder 中,应用程序中无需任何其他 *ImplBase 实现。spring-doc.cadn.net.cn

使用 Spring gRPC 及其对 BindableService 实现的自动发现功能时,GrpcInboundGateway 必须被声明为顶级 Bean。 因此,不推荐使用如 IntegrationFlow.from(new GrpcInboundGateway(TestHelloWorldGrpc.TestHelloWorldImplBase.class)) 这样的 Java DSL API,因为此类 BindableService 实现无法让相应的 Spring gRPC 基础设施可见。

The GrpcInboundGateway 使用 sendAndReceiveMessageReactive() API 与下游流进行交互,并适配 gRPC StreamObserver 的回复为 Mono。 如前所述,请求消息负载完全是一个 gRPC 请求消息,并期望以 gRPC 响应消息的形式返回回复。 下游逻辑可以是类型安全的,并以类似于手动实现 *ImplBase 的方式处理 gRPC 消息。spring-doc.cadn.net.cn

从下游处理逻辑的角度来看,MethodDescriptor.MethodType.UNARYMethodDescriptor.MethodType.BIDI_STREAMING 是相同的。 换句话说,BIDI_STREAMING 被作为请求项的循环进行处理,网关会立即将响应项生成到响应 StreamObserver 中。 对于不同的 BIDI_STREAMING 逻辑,建议采用常规的 gRPC 服务实现。spring-doc.cadn.net.cn

MethodDescriptor.MethodType.CLIENT_STREAMING模式会生成一条消息,其负载为gRPC请求项中的Fluxspring-doc.cadn.net.cn

对于 MethodDescriptor.MethodType.SERVER_STREAMING 模式,回复负载可以是单个 gRPC 响应消息或其中的 Flux 个。spring-doc.cadn.net.cn

下面的示例演示了所提及的IntegrationFlow服务的TestHelloWorldGrpc.TestHelloWorldImplBase实现:spring-doc.cadn.net.cn

@Bean
IntegrationFlow grpcIntegrationFlow(GrpcInboundGateway helloWorldService) {
    return IntegrationFlow.from(helloWorldService)
            .route(Message.class, message ->
                    		message.getHeaders().get(GrpcHeaders.SERVICE_METHOD, String.class),
                    router -> router

                            .subFlowMapping("SayHello", flow -> flow
                                    .transform(this::requestReply))

                            .subFlowMapping("StreamSayHello", flow -> flow
                                    .transform(this::streamReply))

                            .subFlowMapping("HelloToEveryOne", flow -> flow
                                    .transformWith(transformSpec -> transformSpec
                                          .transformer(this::streamRequest)
                                          .async(true)))

                            .subFlowMapping("BidiStreamHello", flow -> flow
                                    .transform(this::requestReply))
            )
            .get();
}

private HelloReply requestReply(HelloRequest helloRequest) {
    return newHelloReply("Hello " + helloRequest.getName());
}

private Flux<HelloReply> streamReply(HelloRequest helloRequest) {
    return Flux.just(
           newHelloReply("Hello " + helloRequest.getName()),
           newHelloReply("Hello again!"));
}

private Mono<HelloReply> streamRequest(Flux<HelloRequest> request) {
    return request
            .map(HelloRequest::getName)
            .collectList()
            .map(names -> StringUtils.collectionToDelimitedString(names, ", "))
            .map("Hello "::concat)
            .map(TestConfig::newHelloReply);
}

private static HelloReply newHelloReply(String message) {
    return HelloReply.newBuilder().setMessage(message).build();
}

路由由GrpcHeaders.SERVICE_METHOD头信息完成,该头信息由GrpcInboundGateway填充。 对于TestHelloWorldGrpc.TestHelloWorldImplBase服务,所有下游转换器的业务方法在gRPC消息方面都是类型安全的。spring-doc.cadn.net.cn

使用 DSL 进行配置

使用 Grpc 工厂将 GrpcInboundGateway 添加到通过 Java DSL 构建的流程中。spring-doc.cadn.net.cn

@Bean
IntegrationFlow grpcInboundFlow() {
    return IntegrationFlow.from(
                    Grpc.inboundGateway(TestSingleHelloWorldGrpc.TestSingleHelloWorldImplBase.class)
                        .requestTimeout(3000L))
            .transform(this::requestReply)
            .get();
}

private HelloReply requestReply(HelloRequest helloRequest) {
    return HelloReply.newBuilder().setMessage("Hello " + helloRequest.getName()).build();
}

gRPC 出站网关

The GrpcOutboundGateway 是一个用于向远程 gRPC 服务器发送 gRPC 请求并作为 gRPC 存根接收响应的 AbstractReplyProducingMessageHandler 实现。 进行初始化时,此网关的实例需要一个 gRPC Channel 和 gRPC 服务类(例如:TestHelloWorldGrpc.class)。spring-doc.cadn.net.cn

网关动态调用从服务的ServiceDescriptor获取的 gRPC 方法。 它支持以下 gRPC 通信模式:spring-doc.cadn.net.cn

默认情况下,GrpcOutboundGateway 是异步的。 可以在网关配置中通过 setAsync(false) 将其关闭。 有关更多信息,请参阅 异步服务激活器

方法名配置

调用方法的名称可以通过四种方式进行配置:spring-doc.cadn.net.cn

  1. 自动检测 具有单个方法的服务:spring-doc.cadn.net.cn

    @Bean
    public GrpcOutboundGateway grpcOutboundGateway(ManagedChannel channel) {
        // When TestSingleMethodGrpc has only one method, it will be auto-detected
        return new GrpcOutboundGateway(channel, TestSingleMethodGrpc.class);
    }
  2. 显式方法名 使用 setMethodName():spring-doc.cadn.net.cn

    @Bean
    public GrpcOutboundGateway grpcOutboundGateway(ManagedChannel channel) {
        GrpcOutboundGateway gateway = new GrpcOutboundGateway(channel, TestHelloWorldGrpc.class);
        gateway.setMethodName("SayHello");
        return gateway;
    }
  3. 动态解析 通过 setMethodNameExpression()spring-doc.cadn.net.cn

    @Bean
    public GrpcOutboundGateway dynamicMethodGateway(ManagedChannel channel) {
        GrpcOutboundGateway gateway = new GrpcOutboundGateway(channel, TestHelloWorldGrpc.class);
        gateway.setMethodNameExpression(new SpelExpressionParser().parseExpression("payload.class.simpleName"));
        return gateway;
    }
  4. 默认方法解析 如果未配置方法名称或方法名称表达式,且服务提供多个方法,网关将在输入消息中查找 GrpcHeaders.SERVICE_METHOD 标头以确定调用哪个方法。 如果缺少 GrpcHeaders.SERVICE_METHOD 标头,将抛出 IllegalStateExceptionspring-doc.cadn.net.cn

    @Bean
    public GrpcOutboundGateway dynamicMethodGateway(ManagedChannel channel) {
    	 // Looks for GrpcHeaders.SERVICE_METHOD header in the input message
        return new GrpcOutboundGateway(channel, TestHelloWorldGrpc.class);
    }

请求负载处理

The GrpcOutboundGateway 自动从 MethodDescriptor 中检测方法类型并适当处理调用:spring-doc.cadn.net.cn

客户端流式方法返回一个 Mono<ResponseType>,而双向流式方法返回包含响应的 Flux<ResponseType>spring-doc.cadn.net.cn

使用 DSL 进行配置

使用 Grpc 工厂通过 Java DSLGrpcOutboundGateway 添加到流程中。 具有单个方法的服务的最简单配置:spring-doc.cadn.net.cn

@Bean
IntegrationFlow grpcOutboundFlow(ManagedChannel channel) {
	return f -> f
            .handle(Grpc.outboundGateway(channel, TestSingleHelloWorldGrpc.class))
            .transform(this::upperCase);
}

private HelloReply upperCase(HelloReply helloReply) {
    return HelloReply.newBuilder().setMessage(helloReply.getMessage().toUpperCase()).build();
}

当 gRPC 服务仅包含一个方法时,它将被自动检测。spring-doc.cadn.net.cn

对于具有多个方法的服务,请使用 DSL 的.methodName().methodNameExpression().methodNameFunction()方法。 查看方法名称配置spring-doc.cadn.net.cn