此版本仍在开发中,尚未被视为稳定版。如需最新稳定版本,请使用 Spring Integration 7.0.4spring-doc.cadn.net.cn

CloudEvents 支持

Spring Integration 提供了对 CloudEvents 规范 的支持。spring-doc.cadn.net.cn

将以下依赖项添加到您的项目中:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-cloudevents</artifactId>
    <version>7.1.0-M3</version>
</dependency>
implementation "org.springframework.integration:spring-integration-cloudevents:7.1.0-M3"

ToCloudEventTransformer

使用 ToCloudEventTransformer 将 Spring Integration 消息转换为符合 CloudEvents 规范的消息。 此转换器支持 CloudEvents 规范 v1.0,并在指定 EventFormateventFormatContentTypeExpression 时序列化 CloudEvents。 当您指定 EventFormateventFormatContentTypeExpression 时,转换器会使用 EventFormat 生成负载中的 CloudEvent。 如果未指定任何选项,转换器将按原样写入事件数据消息负载,并将属性及扩展添加到消息头中。 该转换器支持使用表达式定义属性,并通过模式识别消息头中的扩展。spring-doc.cadn.net.cn

属性表达式

通过 SpEL 表达式设置 idsourcetypedataSchemasubject 的 CloudEvents 属性。spring-doc.cadn.net.cn

转换器将 time 属性设置为创建 CloudEvent 实例的时间。

下表列出了属性名称以及默认表达式返回的值。spring-doc.cadn.net.cn

属性名称 默认值

idspring-doc.cadn.net.cn

消息的 ID。spring-doc.cadn.net.cn

sourcespring-doc.cadn.net.cn

以“/spring/”为前缀,后接应用名称、一个点号,然后是转换器的 Bean 名称,例如 /spring/myapp.toCloudEventTransformerBeanspring-doc.cadn.net.cn

typespring-doc.cadn.net.cn

"spring.message"spring-doc.cadn.net.cn

dataContentTypespring-doc.cadn.net.cn

消息的 contentType,默认为 application/octet-stream。 其他一些示例包括但不限于:application/jsonapplication/x-avroapplication/xmlspring-doc.cadn.net.cn

dataSchemaspring-doc.cadn.net.cn

指定模式的 URI。 dataSchema 的默认值为 nullspring-doc.cadn.net.cn

subjectspring-doc.cadn.net.cn

识别事件在事件生产者上下文中的主题。 subject 的默认值是 nullspring-doc.cadn.net.cn

timespring-doc.cadn.net.cn

CloudEvent 消息创建的时间。 内部设置为当前时间。 注意:您无法配置此值。spring-doc.cadn.net.cn

扩展模式

使用 extensionPatterns 构造函数参数(字符串可变参数)来指定带有通配符的模式匹配(*)。 该转换器会将键匹配任意模式的消息头作为 CloudEvent 扩展包含在内。 使用前缀 ! 可通过否定显式排除某些消息头。 请注意,第一个匹配的模式将生效(无论是正匹配还是负匹配)。spring-doc.cadn.net.cn

例如,将模式 "trace*", "span-id", "user-id" 配置为: - 包含以 trace 开头的标头(例如 trace-idtraceparent) - 包含键名为 span-iduser-id 的精确匹配标头 - 将所有匹配的标头作为扩展添加到 CloudEvent 中spring-doc.cadn.net.cn

要排除特定标头,请使用否定模式:"custom-*", "!custom-internal"包含所有以custom-开头的标头,但custom-internal除外。spring-doc.cadn.net.cn

使用 DSL 进行配置

使用 CloudEvents 工厂将 ToCloudEventTransformer 添加到通过 Java DSL 构建的流程中。spring-doc.cadn.net.cn

@Bean
public ToCloudEventTransformer cloudEventTransformer() {
    return new ToCloudEventTransformer("trace*", "correlation-id");
}

@Bean
public IntegrationFlow cloudEventTransformFlow(ToCloudEventTransformer toCloudEventTransformer) {
    return IntegrationFlows
        .from("inputChannel")
        .transform(CloudEvents.toCloudEventTransformer().get())
        .channel("outputChannel")
        .get();
}

CloudEvent 转换处理

了解转换过程:spring-doc.cadn.net.cn

  1. 构建 CloudEvent - 构建 CloudEvent 属性。spring-doc.cadn.net.cn

  2. 扩展提取 - 使用传入构造函数的 extensionPatterns 数组构建 CloudEvent 扩展。spring-doc.cadn.net.cn

  3. 格式转换 - 应用指定的 EventFormat,如果未设置,则通过二进制格式模式处理转换。spring-doc.cadn.net.cn

一个基本的转换可能具有以下模式:spring-doc.cadn.net.cn

// Input message with headers
Message<byte[]> inputMessage = MessageBuilder
    .withPayload("Hello CloudEvents".getBytes(StandardCharsets.UTF_8))
    .withHeader(MessageHeaders.CONTENT_TYPE, "text/plain")
    .build();

ToCloudEventTransformer transformer = new ToCloudEventTransformer();

// Transform to CloudEvent
Object cloudEventMessage = transformer.transform(inputMessage);

事件格式

ToCloudEventTransformer 使用格式化将 CloudEvent 序列化为消息的负载(当 EventFormat 可用时),否则使用二进制格式模式。 通过以下两种方式之一设置 EventFormatspring-doc.cadn.net.cn

  1. 设置所需的 EventFormatspring-doc.cadn.net.cn

  2. 使用解析为内容类型的表达式设置 eventFormatContentTypeExpression,该类型是 EventFormatProvider 可用于提供所需 EventFormat 的类型。 当设置了 eventFormatContentTypeExpressionEventFormatProvider 因找不到对应内容类型的 EventFormat 而返回 null 时,转换器将抛出 MessageTransformationExceptioneventFormatContentTypeExpression 可解析为以下由 EventFormatProvider 接受的内容类型示例:spring-doc.cadn.net.cn

如果 EventFormateventFormatContentTypeExpression 未设置,转换器将使用云事件前缀(默认为 ce-)向消息头添加云事件属性和扩展,并保持负载不变(二进制格式模式)。spring-doc.cadn.net.cn

要使用特定的 EventFormat,请添加相关的依赖项。 例如,要添加 XML EventFormat,请添加以下依赖项:io.cloudevents:cloudevents-xml。 有关可用的事件格式信息,请参阅 CloudEvents Java 参考文档spring-doc.cadn.net.cn

确保要转换为 CloudEvents 的消息具有类型为 byte[] 的有效载荷。 如果有效载荷不是字节数组,转换器将抛出 IllegalArgumentException

FromCloudEventTransformer

使用 FromCloudEventTransformer 将 CloudEvents 转换为 Spring Integration 消息。 此转换器支持 CloudEvents 规范 v1.0,并处理来自两种有效载荷类型的 CloudEvents:CloudEvent 对象或序列化的 CloudEvent 字节数组。spring-doc.cadn.net.cn

该转换器从消息负载中提取 CloudEvent 条数据,并将 CloudEvent 个属性以及 CloudEvent 个扩展映射到带有 ce- 前缀的消息头。spring-doc.cadn.net.cn

支持的负载类型

转换器接受以下有效载荷类型的消息:spring-doc.cadn.net.cn

CloudEvent 对象类型

当消息负载为 CloudEvent 实例时,转换器:spring-doc.cadn.net.cn

  1. 提取 CloudEvent 数据并将其用作消息负载。spring-doc.cadn.net.cn

  2. 将 CloudEvent 属性(idsourcetypetimesubjectdatacontenttypedataschema)映射为带有 ce- 前缀的消息头。spring-doc.cadn.net.cn

  3. 使用 ce- 前缀将所有 CloudEvent 扩展映射到消息头。spring-doc.cadn.net.cn

  4. 保留所有原始消息头,除非某个头部键匹配 CloudEvent 属性或扩展,在这种情况下将覆盖原始值。spring-doc.cadn.net.cn

String orderJson = ...

CloudEvent cloudEvent = CloudEventBuilder.v1()
    .withId("event-123")
    .withSource(URI.create("/myapp/orders"))
    .withType("order.created")
    .withData("application/json", orderJson.getBytes())
    .withExtension("traceid", "trace-abc")
    .build();

Message<CloudEvent> inputMessage = MessageBuilder
    .withPayload(cloudEvent)
    .build();

FromCloudEventTransformer transformer = new FromCloudEventTransformer();
Message<?> outputMessage = transformer.transform(inputMessage);

上述示例中的 outputMessage 产生的输出类似于以下内容:spring-doc.cadn.net.cn

GenericMessage [
    payload = byte[13],
    headers = {
        ce-source          = /myapp/orders,
        ce-datacontenttype = application/json,
        ce-type            = order.created,
        ce-id              = event-123,
        ce-traceid         = trace-abc,
        id                 = 2df76f27-d139-424c-19b6-80b64e4a33b0,
        contentType        = application/json,
        timestamp          = 1770667476433
    }
]

序列化的 CloudEvent 类型

当消息负载是包含序列化 CloudEventbyte[] 时,转换器:spring-doc.cadn.net.cn

  1. 使用 content-type 标头通过 EventFormatProvider 解析适当的 EventFormatspring-doc.cadn.net.cn

  2. 使用已解析的格式将有效负载反序列化为 CloudEvent 对象。spring-doc.cadn.net.cn

  3. 遵循 CloudEvent 对象类型 部分中列出的相同步骤。spring-doc.cadn.net.cn

关于支持的内容类型的信息在 EventFormats 部分进行了讨论。spring-doc.cadn.net.cn

FromCloudEventTransformer 允许用户设置一个 EventFormat,当 EventFormatProvider 无法为 contentType 头找到 EventFormat,或者消息中不包含 contentType 头时,将使用该值。 如果未设置且 EventFormatProvider 未找到 EventFormat,则将抛出 MessageTransformationException
byte[] serializedCloudEvent = """
				{
					"specversion": "1.0",
					"id": "316b0cf3-0c4d-5858-6bd2-863a2042f442",
					"source": "/spring/testapp.jsonTransformerWithExtensions",
					"type": "spring.message",
					"subject": "test.subject",
					"datacontenttype": "text/plain",
					"time": "2026-01-30T08:53:06.099486-05:00",
					"traceid": "trace-123",
					"data": "Hello, World!"
				}
				""";

Message<byte[]> inputMessage = MessageBuilder
    .withPayload(serializedCloudEvent)
    .setHeader(MessageHeaders.CONTENT_TYPE, "application/cloudevents+json")
    .build();

FromCloudEventTransformer transformer = new FromCloudEventTransformer();
Message<?> outputMessage = transformer.transform(inputMessage);

上述示例中的 outputMessage 产生的输出类似于以下内容:spring-doc.cadn.net.cn

GenericMessage [
    payload = byte[13],
    headers = {
        ce-source           = /spring/testapp.jsonTransformerWithExtensions,
        ce-datacontenttype  = text/plain,
        ce-subject          = test.subject,
        ce-type             = spring.message,
        ce-id               = 316b0cf3-0c4d-5858-6bd2-863a2042f442,
        ce-traceid          = trace-123,
        ce-time             = 2026-01-30T08:53:06.099486-05:00,
        id                  = 463c0878-a9cb-7269-a503-b4224088cd42,
        contentType         = text/plain,
        timestamp           = 1770392214225
    }
]

CloudEvent 属性映射

转换器使用以下 CloudEventHeaders 常量将 CloudEvent 属性映射到消息头:spring-doc.cadn.net.cn

CloudEvent 属性 消息头键 必填

idspring-doc.cadn.net.cn

ce-idspring-doc.cadn.net.cn

是的spring-doc.cadn.net.cn

sourcespring-doc.cadn.net.cn

ce-sourcespring-doc.cadn.net.cn

是的spring-doc.cadn.net.cn

typespring-doc.cadn.net.cn

ce-typespring-doc.cadn.net.cn

是的spring-doc.cadn.net.cn

timespring-doc.cadn.net.cn

ce-timespring-doc.cadn.net.cn

Nospring-doc.cadn.net.cn

subjectspring-doc.cadn.net.cn

ce-subjectspring-doc.cadn.net.cn

Nospring-doc.cadn.net.cn

datacontenttypespring-doc.cadn.net.cn

ce-datacontenttypespring-doc.cadn.net.cn

Nospring-doc.cadn.net.cn

dataschemaspring-doc.cadn.net.cn

ce-dataschemaspring-doc.cadn.net.cn

Nospring-doc.cadn.net.cn

扩展spring-doc.cadn.net.cn

ce-{extensionName}spring-doc.cadn.net.cn

Nospring-doc.cadn.net.cn

输出消息中的 contentType 标头始终设置为 CloudEvent 的 datacontenttype 值。

使用 DSL 进行配置

使用 CloudEvents 工厂将 FromCloudEventTransformer 添加到通过 Java DSL 构建的流程中。spring-doc.cadn.net.cn

@Bean
public FromCloudEventTransformer fromCloudEventTransformer() {
    return new FromCloudEventTransformer();
}

@Bean
public IntegrationFlow fromCloudEventFlow(FromCloudEventTransformer fromCloudEventTransformer) {
    return IntegrationFlows
        .from("cloudEventInputChannel")
        .transform(CloudEvents.fromCloudEventTransformer())
        .channel("messageOutputChannel")
        .get();
}

CloudEvent 标头构建器

CloudEventHeadersBuilder 可用于在使用 Java DSL 时添加 CloudEvent 属性。 该构建器可通过 CloudEvents.headers() 工厂方法访问。spring-doc.cadn.net.cn

构建器支持三种设置头部值的方式:spring-doc.cadn.net.cn

  1. 直接值 - 直接设置静态值spring-doc.cadn.net.cn

  2. SpEL 表达式 - 使用 Spring 表达式语言基于消息实现动态值spring-doc.cadn.net.cn

  3. 函数 - 提供基于消息值进行求值的 Java 函数spring-doc.cadn.net.cn

基本用法

@Bean
public IntegrationFlow cloudEventFlow() {
    return IntegrationFlow
        .from("inputChannel")
        .enrichHeaders(CloudEvents.headers()
            .idExpression("headers.orderId")
            .sourceFunction(msg -> URI.create("https://example.com/" + msg.getHeaders().get("action")))
            .type("order.created")
            .subject("order-processing"))
        .transform(CloudEvents.toCloudEventTransformer())
        .channel("outputChannel")
        .get();
}