|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
Debezium 支持
Debezium 引擎,变更数据捕获(CDC)入站通道适配器。
DebeziumMessageProducer 允许捕获数据库变更事件,将其转换为消息,并随后流式传输到出站通道。
您需要将 Spring Integration Debezium 依赖项添加到项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-debezium</artifactId>
<version>6.4.10</version>
</dependency>
compile "org.springframework.integration:spring-integration-debezium:6.4.10"
您还需要为您的输入数据库包含一个 Debezium 连接器 依赖项。 例如,要在 PostgreSQL 中使用 Debezium,您需要 postgres Debezium 连接器:
-
Maven
-
Gradle
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium-version}</version>
</dependency>
compile "io.debezium:debezium-connector-postgres:{debezium-version}"
|
将 |
入站 Debezium 通道适配器
Debezium 适配器期望一个预先配置的 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> 实例。
|
The debezium-supplier 提供了一个开箱即用的 |
|
Debezium Java DSL 可以从提供的 |
此外,DebeziumMessageProducer 可通过以下配置属性进行调整:
-
contentType- 允许处理JSON(默认)、AVRO和PROTOBUF消息内容。 contentTypemust应与为提供的DebeziumEngine.Builder配置的SerializationFormat保持一致。 -
enableBatch- 当设置为false(默认值)时,Debezium 适配器会在从源数据库接收到每个ChangeEvent数据变更事件时发送新的Message。 如果设置为true,则适配器会为从 Debezium 引擎接收到的每批ChangeEvent向下游发送单个Message。 此类负载不可序列化,需要自定义的序列化/反序列化实现。 -
enableEmptyPayload- 启用对墓碑(即删除)消息的支持。 在数据库行删除时,Debezium 可以发送一个墓碑变更事件,该事件具有与已删除行相同的键,且值为Optional.empty。 默认值为false。 -
headerMapper- 自定义HeaderMapper实现,允许选择并将ChangeEvent标头转换为Message标头。 默认的DefaultDebeziumHeaderMapper实现为setHeaderNamesToMap提供 setter 方法。 默认情况下,所有标头都会被映射。 -
taskExecutor- 为 Debezium 引擎设置自定义的TaskExecutor。
以下代码片段展示了该通道适配器的各种配置:
使用 Java 配置进行配置
以下 Spring Boot 应用示例展示了如何使用 Java 配置来配置入站适配器:
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public MessageChannel debeziumInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer =
new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(Message<?> message) {
Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); (1)
String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); (2)
String payload = new String((byte[]) message.getPayload()); (3)
System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload);
}
}
| 1 | 事件目标逻辑目的地的名称。
通常,目的地由 topic.prefix 配置选项、数据库名和表名组成。例如:my-topic.inventory.orders。 |
| 2 | 包含已更改表的主键架构和已更改行的实际主键。
主键架构及其对应的主键负载均包含一个字段,用于表示在连接器创建事件时已更改表的 PRIMARY KEY(或唯一约束)中的每一列。 |
| 3 | 与键一样,负载(payload)也包含一个模式(schema)部分和一个负载值部分。 模式部分包含描述负载值部分信封(Envelope)结构的模式,包括其嵌套字段。 创建、更新或删除数据的变更事件都具有带有信封结构的值负载。 |
|
|
同样,我们可以配置 DebeziumMessageProducer 以批处理方式处理传入的变更事件:
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer = new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setEnableBatch(true);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(List<ChangeEvent<Object, Object>> payload) {
System.out.println(payload);
}
Debezium Java DSL 支持
spring-integration-debezium 通过 Debezium 工厂和 DebeziumMessageProducerSpec 实现类,提供了便捷的 Java DSL 流畅 API。
Debezium Java DSL 的入站通道适配器是:
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder = ...
IntegrationFlow.from(
Debezium.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
或者从原生 Debezium 配置属性创建 DebeziumMessageProducerSpec 实例,并默认使用 JSON 序列化格式。
Properties debeziumConfig = ...
IntegrationFlow
.from(Debezium.inboundChannelAdapter(debeziumConfig))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
以下 Spring Boot 应用提供了使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow debeziumInbound(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {
return IntegrationFlow
.from(Debezium
.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
.get();
}
}