|
对于最新稳定版本,请使用 Spring Integration 7.0.0! |
Debezium 支持
Debezium 引擎,变更数据采集(CDC)入站通道适配器。
这DebeziumMessageProducer允许捕获数据库变更事件,将其转换为消息,并随后向出站频道流式传输。
你需要在项目中包含 Spring 集成 Debezium 的依赖性:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-debezium</artifactId>
<version>6.3.11</version>
</dependency>
compile "org.springframework.integration:spring-integration-debezium:6.3.11"
你还需要为输入数据库添加 debezium 连接器的依赖。 例如,要在PostgreSQL中使用Debezium,你需要postgres debezium连接器:
-
Maven
-
Gradle
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium-version}</version>
</dependency>
compile "io.debezium:debezium-connector-postgres:{debezium-version}"
|
替换 |
入站Debezium通道适配器
Debezium 适配器期望预配置DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>>实例。
|
debezium提供商提供一个开箱即用的方案 |
|
Debezium Java DSL 可以创建 |
此外,DebeziumMessageProducer可以通过以下配置属性进行调谐:
-
内容类型- 允许处理JSON(默认),阿弗罗和普罗托布夫消息内容。 内容类型必须与序列化格式配置为所提供的Debezium引擎。建造者. -
enableBatch- 当设置为false(默认情况下),Debezium适配器会发送新的消息对于每个变更事件从源数据库接收到的数据变更事件。 如果设置为true然后适配器向下发送单一消息对于每批变更事件由Debezium发动机获得。 此类载荷不可序列化,需自定义序列化/反序列化实现。 -
enableEmptyPayload- 支持墓碑式(即删除)消息。 在数据库行删除中,Debezium 可以发送一个墓碑变更事件,该事件与已删除行具有相同的键,且值为Optional.empty. 默认false. -
头部映射器-习惯头部映射器实现允许选择和转换变更事件标题消息头。 默认DefaultDebeziumHeaderMapper实现为 提供了setHeaderNamesToMap. 默认情况下,所有头部都是映射的。 -
任务执行者- 设置自定义任务执行者用于Debezium发动机。
以下代码片段展示了该通道适配器的各种配置:
使用 Java 配置配置
以下 Spring Boot 应用程序展示了如何用 Java 配置配置入站适配器的示例:
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public MessageChannel debeziumInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer =
new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(Message<?> message) {
Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); (1)
String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); (2)
String payload = new String((byte[]) message.getPayload()); (3)
System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload);
}
}
| 1 | 事件所指的逻辑目的地名称。
通常目的地由以下组成topic.前缀配置选项、数据库名称和表名称。例如:我的主题。库存。命令. |
| 2 | 包含更改表键和修改后行实际键的模式。
密钥模式及其对应的密钥有效载荷都包含对应更改表中每一列的字段主键(或唯一约束)在连接器创建事件时。 |
| 3 | 与密钥类似,有效载荷有模式部分和有效载荷值值部分。 模式部分包含描述有效载荷值部分包络结构的模式,包括其嵌套字段。 用于创建、更新或删除数据的作的变更事件,所有的值有效载荷都带有包络结构。 |
|
这 |
类似地,我们可以配置DebeziumMessageProducer批量处理接收的变更事件:
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer = new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setEnableBatch(true);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(List<ChangeEvent<Object, Object>> payload) {
System.out.println(payload);
}
Debezium Java DSL 支持
这Spring积分-德贝齐姆通过德贝齐姆工厂和DebeziumMessageProducerSpec实现。
Debezium Java DSL 的入站通道适配器是:
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder = ...
IntegrationFlow.from(
Debezium.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
或者创建一个DebeziumMessageProducerSpec实例来自本地 DeBezium 配置属性,默认为JSON序列化格式。
Properties debeziumConfig = ...
IntegrationFlow
.from(Debezium.inboundChannelAdapter(debeziumConfig))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
以下 Spring Boot 应用程序示例了如何用 Java DSL 配置入站适配器:
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow debeziumInbound(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {
return IntegrationFlow
.from(Debezium
.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
.get();
}
}