|
此版本仍在开发中,尚未被视为稳定版。如需最新稳定版本,请使用 Spring Integration 7.0.4! |
CloudEvents 支持
Spring Integration 提供了对 CloudEvents 规范 的支持。
将以下依赖项添加到您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-cloudevents</artifactId>
<version>7.1.0-M3</version>
</dependency>
implementation "org.springframework.integration:spring-integration-cloudevents:7.1.0-M3"
ToCloudEventTransformer
使用 ToCloudEventTransformer 将 Spring Integration 消息转换为符合 CloudEvents 规范的消息。
此转换器支持 CloudEvents 规范 v1.0,并在指定 EventFormat 或 eventFormatContentTypeExpression 时序列化 CloudEvents。
当您指定 EventFormat 或 eventFormatContentTypeExpression 时,转换器会使用 EventFormat 生成负载中的 CloudEvent。
如果未指定任何选项,转换器将按原样写入事件数据消息负载,并将属性及扩展添加到消息头中。
该转换器支持使用表达式定义属性,并通过模式识别消息头中的扩展。
属性表达式
通过 SpEL 表达式设置 id、source、type、dataSchema 和 subject 的 CloudEvents 属性。
转换器将 time 属性设置为创建 CloudEvent 实例的时间。 |
下表列出了属性名称以及默认表达式返回的值。
| 属性名称 | 默认值 |
|---|---|
|
消息的 ID。 |
|
以“/spring/”为前缀,后接应用名称、一个点号,然后是转换器的 Bean 名称,例如 |
|
"spring.message" |
|
消息的 contentType,默认为 |
|
指定模式的 URI。
|
|
识别事件在事件生产者上下文中的主题。
|
|
CloudEvent 消息创建的时间。 内部设置为当前时间。 注意:您无法配置此值。 |
扩展模式
使用 extensionPatterns 构造函数参数(字符串可变参数)来指定带有通配符的模式匹配(*)。
该转换器会将键匹配任意模式的消息头作为 CloudEvent 扩展包含在内。
使用前缀 ! 可通过否定显式排除某些消息头。
请注意,第一个匹配的模式将生效(无论是正匹配还是负匹配)。
例如,将模式 "trace*", "span-id", "user-id" 配置为:
- 包含以 trace 开头的标头(例如 trace-id、traceparent)
- 包含键名为 span-id 和 user-id 的精确匹配标头
- 将所有匹配的标头作为扩展添加到 CloudEvent 中
要排除特定标头,请使用否定模式:"custom-*", "!custom-internal"包含所有以custom-开头的标头,但custom-internal除外。
使用 DSL 进行配置
使用 CloudEvents 工厂将 ToCloudEventTransformer 添加到通过 Java DSL 构建的流程中。
@Bean
public ToCloudEventTransformer cloudEventTransformer() {
return new ToCloudEventTransformer("trace*", "correlation-id");
}
@Bean
public IntegrationFlow cloudEventTransformFlow(ToCloudEventTransformer toCloudEventTransformer) {
return IntegrationFlows
.from("inputChannel")
.transform(CloudEvents.toCloudEventTransformer().get())
.channel("outputChannel")
.get();
}
CloudEvent 转换处理
了解转换过程:
-
构建 CloudEvent - 构建 CloudEvent 属性。
-
扩展提取 - 使用传入构造函数的 extensionPatterns 数组构建 CloudEvent 扩展。
-
格式转换 - 应用指定的
EventFormat,如果未设置,则通过二进制格式模式处理转换。
一个基本的转换可能具有以下模式:
// Input message with headers
Message<byte[]> inputMessage = MessageBuilder
.withPayload("Hello CloudEvents".getBytes(StandardCharsets.UTF_8))
.withHeader(MessageHeaders.CONTENT_TYPE, "text/plain")
.build();
ToCloudEventTransformer transformer = new ToCloudEventTransformer();
// Transform to CloudEvent
Object cloudEventMessage = transformer.transform(inputMessage);
事件格式
ToCloudEventTransformer 使用格式化将 CloudEvent 序列化为消息的负载(当 EventFormat 可用时),否则使用二进制格式模式。
通过以下两种方式之一设置 EventFormat:
-
设置所需的
EventFormat。 -
使用解析为内容类型的表达式设置
eventFormatContentTypeExpression,该类型是EventFormatProvider可用于提供所需EventFormat的类型。 当设置了eventFormatContentTypeExpression且EventFormatProvider因找不到对应内容类型的EventFormat而返回 null 时,转换器将抛出MessageTransformationException。eventFormatContentTypeExpression可解析为以下由EventFormatProvider接受的内容类型示例:-
application/cloudevents+json -
application/cloudevents+xml
-
如果 EventFormat 和 eventFormatContentTypeExpression 未设置,转换器将使用云事件前缀(默认为 ce-)向消息头添加云事件属性和扩展,并保持负载不变(二进制格式模式)。
要使用特定的 EventFormat,请添加相关的依赖项。
例如,要添加 XML EventFormat,请添加以下依赖项:io.cloudevents:cloudevents-xml。
有关可用的事件格式信息,请参阅 CloudEvents Java 参考文档。
确保要转换为 CloudEvents 的消息具有类型为 byte[] 的有效载荷。
如果有效载荷不是字节数组,转换器将抛出 IllegalArgumentException。 |
FromCloudEventTransformer
使用 FromCloudEventTransformer 将 CloudEvents 转换为 Spring Integration 消息。
此转换器支持 CloudEvents 规范 v1.0,并处理来自两种有效载荷类型的 CloudEvents:CloudEvent 对象或序列化的 CloudEvent 字节数组。
该转换器从消息负载中提取 CloudEvent 条数据,并将 CloudEvent 个属性以及 CloudEvent 个扩展映射到带有 ce- 前缀的消息头。
支持的负载类型
转换器接受以下有效载荷类型的消息:
CloudEvent 对象类型
当消息负载为 CloudEvent 实例时,转换器:
-
提取 CloudEvent 数据并将其用作消息负载。
-
将 CloudEvent 属性(
id、source、type、time、subject、datacontenttype、dataschema)映射为带有ce-前缀的消息头。 -
使用
ce-前缀将所有 CloudEvent 扩展映射到消息头。 -
保留所有原始消息头,除非某个头部键匹配
CloudEvent属性或扩展,在这种情况下将覆盖原始值。
示例:
String orderJson = ...
CloudEvent cloudEvent = CloudEventBuilder.v1()
.withId("event-123")
.withSource(URI.create("/myapp/orders"))
.withType("order.created")
.withData("application/json", orderJson.getBytes())
.withExtension("traceid", "trace-abc")
.build();
Message<CloudEvent> inputMessage = MessageBuilder
.withPayload(cloudEvent)
.build();
FromCloudEventTransformer transformer = new FromCloudEventTransformer();
Message<?> outputMessage = transformer.transform(inputMessage);
上述示例中的 outputMessage 产生的输出类似于以下内容:
GenericMessage [
payload = byte[13],
headers = {
ce-source = /myapp/orders,
ce-datacontenttype = application/json,
ce-type = order.created,
ce-id = event-123,
ce-traceid = trace-abc,
id = 2df76f27-d139-424c-19b6-80b64e4a33b0,
contentType = application/json,
timestamp = 1770667476433
}
]
序列化的 CloudEvent 类型
当消息负载是包含序列化 CloudEvent 的 byte[] 时,转换器:
-
使用
content-type标头通过EventFormatProvider解析适当的EventFormat。 -
使用已解析的格式将有效负载反序列化为
CloudEvent对象。 -
遵循 CloudEvent 对象类型 部分中列出的相同步骤。
关于支持的内容类型的信息在 EventFormats 部分进行了讨论。
FromCloudEventTransformer 允许用户设置一个 EventFormat,当 EventFormatProvider 无法为 contentType 头找到 EventFormat,或者消息中不包含 contentType 头时,将使用该值。
如果未设置且 EventFormatProvider 未找到 EventFormat,则将抛出 MessageTransformationException。 |
示例:
byte[] serializedCloudEvent = """
{
"specversion": "1.0",
"id": "316b0cf3-0c4d-5858-6bd2-863a2042f442",
"source": "/spring/testapp.jsonTransformerWithExtensions",
"type": "spring.message",
"subject": "test.subject",
"datacontenttype": "text/plain",
"time": "2026-01-30T08:53:06.099486-05:00",
"traceid": "trace-123",
"data": "Hello, World!"
}
""";
Message<byte[]> inputMessage = MessageBuilder
.withPayload(serializedCloudEvent)
.setHeader(MessageHeaders.CONTENT_TYPE, "application/cloudevents+json")
.build();
FromCloudEventTransformer transformer = new FromCloudEventTransformer();
Message<?> outputMessage = transformer.transform(inputMessage);
上述示例中的 outputMessage 产生的输出类似于以下内容:
GenericMessage [
payload = byte[13],
headers = {
ce-source = /spring/testapp.jsonTransformerWithExtensions,
ce-datacontenttype = text/plain,
ce-subject = test.subject,
ce-type = spring.message,
ce-id = 316b0cf3-0c4d-5858-6bd2-863a2042f442,
ce-traceid = trace-123,
ce-time = 2026-01-30T08:53:06.099486-05:00,
id = 463c0878-a9cb-7269-a503-b4224088cd42,
contentType = text/plain,
timestamp = 1770392214225
}
]
CloudEvent 属性映射
转换器使用以下 CloudEventHeaders 常量将 CloudEvent 属性映射到消息头:
| CloudEvent 属性 | 消息头键 | 必填 |
|---|---|---|
|
|
是的 |
|
|
是的 |
|
|
是的 |
|
|
No |
|
|
No |
|
|
No |
|
|
No |
扩展 |
|
No |
输出消息中的 contentType 标头始终设置为 CloudEvent 的 datacontenttype 值。 |
使用 DSL 进行配置
使用 CloudEvents 工厂将 FromCloudEventTransformer 添加到通过 Java DSL 构建的流程中。
@Bean
public FromCloudEventTransformer fromCloudEventTransformer() {
return new FromCloudEventTransformer();
}
@Bean
public IntegrationFlow fromCloudEventFlow(FromCloudEventTransformer fromCloudEventTransformer) {
return IntegrationFlows
.from("cloudEventInputChannel")
.transform(CloudEvents.fromCloudEventTransformer())
.channel("messageOutputChannel")
.get();
}
CloudEvent 标头构建器
CloudEventHeadersBuilder 可用于在使用 Java DSL 时添加 CloudEvent 属性。
该构建器可通过 CloudEvents.headers() 工厂方法访问。
构建器支持三种设置头部值的方式:
-
直接值 - 直接设置静态值
-
SpEL 表达式 - 使用 Spring 表达式语言基于消息实现动态值
-
函数 - 提供基于消息值进行求值的 Java 函数
基本用法
@Bean
public IntegrationFlow cloudEventFlow() {
return IntegrationFlow
.from("inputChannel")
.enrichHeaders(CloudEvents.headers()
.idExpression("headers.orderId")
.sourceFunction(msg -> URI.create("https://example.com/" + msg.getHeaders().get("action")))
.type("order.created")
.subject("order-processing"))
.transform(CloudEvents.toCloudEventTransformer())
.channel("outputChannel")
.get();
}