|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
Apache Kafka 支持
概述
Spring Integration for Apache Kafka 基于 Spring for Apache Kafka 项目。
您需要将以下依赖项包含到您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>6.4.10</version>
</dependency>
compile "org.springframework.integration:spring-integration-kafka:6.4.10"
它提供以下组件:
出站通道适配器
出站通道适配器用于将消息从 Spring Integration 通道发布到 Apache Kafka 主题。 该通道在应用上下文中定义,然后连接到向 Apache Kafka 发送消息的应用程序。 发送应用程序可以使用 Spring Integration 消息发布到 Apache Kafka,这些消息由出站通道适配器内部转换为 Kafka 记录,如下所示:
-
Spring Integration 消息的负载用于填充 Kafka 记录的负载。
-
默认情况下,Spring Integration 消息的
kafka_messageKey头用于填充 Kafka 记录的键。
您可以通过 kafka_topic 和 kafka_partitionId 标头分别自定义发布消息的目标主题和分区。
此外,<int-kafka:outbound-channel-adapter> 支持通过在出站消息上应用 SpEL 表达式来提取键、目标主题和目标分区。
为此,它支持三对互斥的属性:
-
topic和topic-expression -
message-key和message-key-expression -
partition-id和partition-id-expression
这些选项允许您分别将 topic、message-key 和 partition-id 作为静态值指定给适配器,或在运行时根据请求消息动态评估它们的值。
The KafkaHeaders interface (provided by spring-kafka) contains constants used for interacting with
headers.
The messageKey and topic default headers now require a kafka_ prefix.
When migrating from an earlier version that used the old headers, you need to specify message-key-expression="headers['messageKey']" and topic-expression="headers['topic']" on the <int-kafka:outbound-channel-adapter>.
Alternatively, you can change the headers upstream to the new headers from KafkaHeaders by using a <header-enricher> or a MessageBuilder.
If you use constant values, you can also configure them on the adapter by using topic and message-key. |
注意:如果适配器已使用主题或消息键(通过常量或表达式)进行配置,则这些将被使用,相应的标头将被忽略。 如果您希望标头覆盖配置,则需要在表达式中进行配置,如下所示:
topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"
适配器需要一个 KafkaTemplate,而该 KafkaTemplate 又需要一个配置适当的 KafkaProducerFactory。
如果提供了 send-failure-channel(sendFailureChannel)并且收到 send() 失败(同步或异步),则会向通道发送 ErrorMessage。
负载是一个包含 failedMessage、record(即 ProducerRecord)和 cause 属性的 KafkaSendFailureException。
您可以通过设置 error-message-strategy 属性来覆盖 DefaultErrorMessageStrategy。
如果提供了 send-success-channel(sendSuccessChannel),则在发送成功后会发送一条负载类型为 org.apache.kafka.clients.producer.RecordMetadata 的消息。
如果您的应用程序使用事务,并且相同的通道适配器用于发布由监听器容器启动的事务的消息,以及在没有现有事务的情况下发布消息,则必须在 transactionIdPrefix 上配置 KafkaTemplate 以覆盖容器或事务管理器使用的默认前缀。
由容器启动的事务(生产者工厂或事务管理器属性)使用的前缀在所有应用程序实例中必须相同。
仅用于生产者的事务使用的前缀在所有应用程序实例中必须是唯一的。 |
您可以配置一个 flushExpression,它必须解析为布尔值。
如果您使用了 linger.ms 和 batch.size Kafka 生产者属性,在发送多条消息后进行刷新可能很有用;该表达式应在最后一条消息上求值为 Boolean.TRUE,并且不完整的批次将立即发送。
默认情况下,该表达式会在 KafkaIntegrationHeaders.FLUSH 消息头中查找 Boolean 值(kafka_flush)。
如果该值为 true,则执行刷新操作;如果该值为 false 或消息头不存在,则不执行刷新。
The KafkaProducerMessageHandler.sendTimeoutExpression 默认值已从 10 秒更改为 Kafka 生产者属性 + 5000,以便将实际的 Kafka 超时错误传播到应用程序,而不是由该框架生成的超时错误。
出于一致性考虑进行了此更改,因为您可能会遇到意外行为(Spring 可能会在发送时超时,而实际上最终是成功的)。
重要提示:该超时的默认值为 120 秒,因此您可能希望将其减少以获得更及时的失败反馈。
配置
以下示例展示了如何为 Apache Kafka 配置出站通道适配器:
-
Java DSL
-
Java
-
XML
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
}
@Bean
public IntegrationFlow sendToKafkaFlow() {
return f -> f
.splitWith(s -> s.<String>function(p -> Stream.generate(() -> p).limit(101).iterator()))
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
.timestampExpression("T(Long).valueOf('1487694048633')"),
e -> e.id("kafkaProducer1")))
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
.timestamp(m -> 1487694048644L),
e -> e.id("kafkaProducer2")))
);
}
@Bean
public DefaultKafkaHeaderMapper mapper() {
return new DefaultKafkaHeaderMapper();
}
private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
ProducerFactory<Integer, String> producerFactory, String topic) {
return Kafka
.outboundChannelAdapter(producerFactory)
.messageKey(m -> m
.getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.headerMapper(mapper())
.partitionId(m -> 10)
.topicExpression("headers[kafka_topic] ?: '" + topic + "'")
.configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression("someTopic"));
handler.setMessageKeyExpression(new LiteralExpression("someKey"));
handler.setSuccessChannel(successes());
handler.setFailureChannel(failures());
return handler;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaProducerFactory<>(props);
}
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="template"
auto-startup="false"
channel="inputToKafka"
topic="foo"
sync="false"
message-key-expression="'bar'"
send-failure-channel="failures"
send-success-channel="successes"
error-message-strategy="ems"
partition-id-expression="2">
</int-kafka:outbound-channel-adapter>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
... <!-- more producer properties -->
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
消息驱动通道适配器
The KafkaMessageDrivenChannelAdapter (<int-kafka:message-driven-channel-adapter>) uses a spring-kafka KafkaMessageListenerContainer or ConcurrentListenerContainer.
此外,mode 属性也可用。
它可以接受 record 或 batch 的值(默认值为:record)。
在 record 模式下,每条消息的负载会从单个 ConsumerRecord 转换而来。
在 batch 模式下,负载是一个对象列表,这些对象由消费者轮询返回的所有 ConsumerRecord 实例转换而来。
与批处理的 @KafkaListener 类似,KafkaHeaders.RECEIVED_KEY、KafkaHeaders.RECEIVED_PARTITION、KafkaHeaders.RECEIVED_TOPIC 和 KafkaHeaders.OFFSET 头部也是列表,其位置与负载中的位置相对应。
已接收的消息包含某些预填充的头部信息。
有关更多信息,请参阅 KafkaHeaders 类。
The Consumer object (in the kafka_consumer header) is not thread-safe.
You must invoke its methods only on the thread that calls the listener within the adapter.
If you hand off the message to another thread, you must not call its methods. |
当提供retry-template时,投递失败将根据其重试策略进行重试。
如果同时提供了error-channel,则在重试耗尽后将使用默认的ErrorMessageSendingRecoverer作为恢复回调。
您也可以使用recovery-callback来指定在该情况下采取的其他操作,或将其设置为null以将最终异常抛出给监听器容器,以便在那里进行处理。
在构建 ErrorMessage(用于 error-channel 或 recovery-callback)时,您可以通过设置 error-message-strategy 属性来自定义错误消息。
默认情况下,会使用 RawRecordHeaderErrorMessageStrategy,以便访问转换后的消息以及原始的 ConsumerRecord。
这种重试方式是阻塞的,如果所有轮询记录的重试延迟总和可能超过 max.poll.interval.ms 消费者属性,则可能导致重新平衡。
相反,建议在监听器容器中添加一个 DefaultErrorHandler,并配置为使用 KafkaErrorSendingMessageRecoverer。 |
配置
以下示例展示了如何配置消息驱动通道适配器:
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow topic1ListenerFromKafkaFlow() {
return IntegrationFlow
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
.configureListenerContainer(c ->
c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
.id("topic1ListenerContainer"))
.recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
new RawRecordHeaderErrorMessageStrategy()))
.retryTemplate(new RetryTemplate())
.filterInRetry(true))
.filter(Message.class, m ->
m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
f -> f.throwExceptionOnRejection(true))
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("listeningFromKafkaResults1"))
.get();
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
ContainerProperties properties = new ContainerProperties(this.topic);
// set more properties
return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaConsumerFactory<>(props);
}
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
mode="record"
retry-template="template"
recovery-callback="callback"
error-message-strategy="ems"
channel="someChannel"
error-channel="errorChannel" />
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
...
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="foo" />
</bean>
</constructor-arg>
</bean>
您也可以使用用于 @KafkaListener 注解的容器工厂,为其他目的创建 ConcurrentMessageListenerContainer 实例。
请参阅 Spring for Apache Kafka 文档 获取示例。
使用 Java DSL,容器无需配置为 @Bean,因为 DSL 会将容器注册为一个 Bean。
以下示例展示了如何实现:
@Bean
public IntegrationFlow topic2ListenerFromKafkaFlow() {
return IntegrationFlow
.from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2),
KafkaMessageDrivenChannelAdapter.ListenerMode.record)
.id("topic2Adapter"))
...
get();
}
请注意,在这种情况下,适配器被赋予了 id(topic2Adapter)。
该容器以名称 topic2Adapter.container 注册到应用程序上下文中。
如果适配器没有 id 属性,则容器的 Bean 名称是容器的完全限定类名加上 #n,其中 n 对每个容器递增。
入站通道适配器
KafkaMessageSource 提供了一个可轮询的通道适配器实现。
配置
-
Java DSL
-
Kotlin
-
Java
-
XML
@Bean
public IntegrationFlow flow(ConsumerFactory<String, String> cf) {
return IntegrationFlow.from(Kafka.inboundChannelAdapter(cf, new ConsumerProperties("myTopic")),
e -> e.poller(Pollers.fixedDelay(5000)))
.handle(System.out::println)
.get();
}
@Bean
fun sourceFlow(cf: ConsumerFactory<String, String>) =
integrationFlow(Kafka.inboundChannelAdapter(cf,
ConsumerProperties(TEST_TOPIC3).also {
it.groupId = "kotlinMessageSourceGroup"
}),
{ poller(Pollers.fixedDelay(100)) }) {
handle { m ->
}
}
@InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000"))
@Bean
public KafkaMessageSource<String, String> source(ConsumerFactory<String, String> cf) {
ConsumerProperties consumerProperties = new ConsumerProperties("myTopic");
consumerProperties.setGroupId("myGroupId");
consumerProperties.setClientId("myClientId");
retunr new KafkaMessageSource<>(cf, consumerProperties);
}
<int-kafka:inbound-channel-adapter
id="adapter1"
consumer-factory="consumerFactory"
consumer-properties="consumerProperties1"
ack-factory="ackFactory"
channel="inbound"
message-converter="converter"
payload-type="java.lang.String"
raw-header="true"
auto-startup="false">
<int:poller fixed-delay="5000"/>
</int-kafka:inbound-channel-adapter>
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="max.poll.records" value="1"/>
</map>
</constructor-arg>
</bean>
<bean id="consumerProperties1" class="org.springframework.kafka.listener.ConsumerProperties">
<constructor-arg name="topics" value="topic1"/>
<property name="groupId" value="group"/>
<property name="clientId" value="client"/>
</bean>
请参阅 Javadocs 以获取可用属性。
默认情况下,max.poll.records 必须在消费者工厂中显式设置,或者如果消费者工厂是 DefaultKafkaConsumerFactory,则会被强制设为 1。
您可以将属性 allowMultiFetch 设置为 true 以覆盖此行为。
您必须在 max.poll.interval.ms 内轮询消费者以避免重新平衡。
如果您将值设置为从 allowMultiFetch 到 true,则必须处理所有检索到的记录,并在 max.poll.interval.ms 内再次进行轮询。 |
此适配器发出的消息包含一个名为 kafka_remainingRecords 的头部,其中记录了上一次轮询中剩余的记录数量。
从 6.2 版本开始,KafkaMessageSource 支持在消费者属性中提供的 ErrorHandlingDeserializer。
DeserializationException 会从记录头中提取并传递给调用方。
当启用 SourcePollingChannelAdapter 时,该异常会被包装成 ErrorMessage 并发布到其 errorChannel。
更多详细信息请参阅 ErrorHandlingDeserializer 文档。
出站网关
出站网关用于请求/回复操作。 它与大多数 Spring Integration 网关的不同之处在于,发送线程在网关中不会阻塞,而回复是在回复监听器容器线程上处理的。 如果您的代码在同步的 消息网关 后面调用该网关,则用户线程会在此处阻塞,直到收到回复(或发生超时)。
The KafkaProducerMessageHandler sendTimeoutExpression 默认值为 delivery.timeout.ms 的 Kafka 生产者属性 + 5000,以便在发生超时时将实际的 Kafka 错误传播到应用程序,而不是由本框架生成的超时错误。
为了保持一致性,此设置已更改,因为您可能会遇到意外行为(Spring 可能会超时 send(),而实际上该操作最终是成功的)。
重要提示:该超时默认值为 120 秒,因此您可能希望将其调小以获得更及时的失败反馈。
配置
下面的示例展示了如何配置网关:
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow outboundGateFlow(
ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
return IntegrationFlow.from("kafkaRequests")
.handle(Kafka.outboundGateway(kafkaTemplate))
.channel("kafkaReplies")
.get();
}
@Bean
@ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies")
public KafkaProducerMessageHandler<String, String> outGateway(
ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
return new KafkaProducerMessageHandler<>(kafkaTemplate);
}
<int-kafka:outbound-gateway
id="allProps"
error-message-strategy="ems"
kafka-template="template"
message-key-expression="'key'"
order="23"
partition-id-expression="2"
reply-channel="replies"
reply-timeout="43"
request-channel="requests"
requires-reply="false"
send-success-channel="successes"
send-failure-channel="failures"
send-timeout-expression="44"
sync="true"
timestamp-expression="T(System).currentTimeMillis()"
topic-expression="'topic'"/>
请参阅 Javadocs 以获取可用属性。
请注意,这里使用的是与 出站通道适配器 相同的类,唯一的区别是传递给构造函数的 KafkaTemplate 是一个 ReplyingKafkaTemplate。
有关更多信息,请参阅 Spring for Apache Kafka 文档。
出站主题、分区、键等与出站适配器的确定方式相同。 回复主题的确定方式如下:
-
一个名为
KafkaHeaders.REPLY_TOPIC的消息头(如果存在,其值必须为String或byte[])会与模板的回复容器的订阅主题进行验证。 -
如果模板的
replyContainer仅订阅了一个主题,则使用该主题。
您也可以指定一个 KafkaHeaders.REPLY_PARTITION 头,以确定用于回复的特定分区。
同样,这将根据模板的回复容器的订阅进行验证。
另外,您也可以使用如下类似的 bean 配置:
@Bean
public IntegrationFlow outboundGateFlow() {
return IntegrationFlow.from("kafkaRequests")
.handle(Kafka.outboundGateway(producerFactory(), replyContainer())
.configureKafkaTemplate(t -> t.replyTimeout(30_000)))
.channel("kafkaReplies")
.get();
}
传入网关
入站网关用于请求/回复操作。
配置
以下示例展示了如何配置入站网关:
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow serverGateway(
ConcurrentMessageListenerContainer<Integer, String> container,
KafkaTemplate<Integer, String> replyTemplate) {
return IntegrationFlow
.from(Kafka.inboundGateway(container, replyTemplate)
.replyTimeout(30_000))
.<String, String>transform(String::toUpperCase)
.get();
}
@Bean
public KafkaInboundGateway<Integer, String, String> inboundGateway(
AbstractMessageListenerContainer<Integer, String>container,
KafkaTemplate<Integer, String> replyTemplate) {
KafkaInboundGateway<Integer, String, String> gateway =
new KafkaInboundGateway<>(container, replyTemplate);
gateway.setRequestChannel(requests);
gateway.setReplyChannel(replies);
gateway.setReplyTimeout(30_000);
return gateway;
}
<int-kafka:inbound-gateway
id="gateway1"
listener-container="container1"
kafka-template="template"
auto-startup="false"
phase="100"
request-timeout="5000"
request-channel="nullChannel"
reply-channel="errorChannel"
reply-timeout="43"
message-converter="messageConverter"
payload-type="java.lang.String"
error-message-strategy="ems"
retry-template="retryTemplate"
recovery-callback="recoveryCallback"/>
请参阅 Javadocs 以获取可用属性。
当提供RetryTemplate时,投递失败将根据其重试策略进行重试。
如果同时提供了error-channel,则在重试耗尽后将使用默认的ErrorMessageSendingRecoverer作为恢复回调。
您也可以使用recovery-callback来指定在该情况下采取的其他操作,或将其设置为null以将最终异常抛出给监听器容器,以便在那里进行处理。
在构建 ErrorMessage(用于 error-channel 或 recovery-callback)时,您可以通过设置 error-message-strategy 属性来自定义错误消息。
默认情况下,会使用 RawRecordHeaderErrorMessageStrategy,以便访问转换后的消息以及原始的 ConsumerRecord。
这种重试方式是阻塞的,如果所有轮询记录的重试延迟总和可能超过 max.poll.interval.ms 消费者属性,则可能导致重新平衡。
相反,建议在监听器容器中添加一个 DefaultErrorHandler,并配置为使用 KafkaErrorSendingMessageRecoverer。 |
下面的示例展示了如何使用 Java DSL 配置一个简单的转换为大写的转换器:
或者,您可以通过使用类似以下代码的方式配置一个大写转换器:
@Bean
public IntegrationFlow serverGateway() {
return IntegrationFlow
.from(Kafka.inboundGateway(consumerFactory(), containerProperties(),
producerFactory())
.replyTimeout(30_000))
.<String, String>transform(String::toUpperCase)
.get();
}
您也可以使用用于 @KafkaListener 注解的容器工厂,为其他目的创建 ConcurrentMessageListenerContainer 实例。
请参阅 Spring for Apache Kafka 文档 和 消息驱动通道适配器 以获取示例。
由 Apache Kafka 主题支持的通道
Spring Integration 拥有 MessageChannel 个由 Apache Kafka 主题支持的持久化实现。
每个通道都需要在发送端使用 KafkaTemplate,对于可订阅的通道则使用监听容器工厂,而对于轮询通道则使用 KafkaMessageSource。
Java DSL 配置
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow flowWithSubscribable(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return IntegrationFlow.from(...)
...
.channel(Kafka.channel(template, containerFactory, "someTopic1").groupId("group1"))
...
.get();
}
@Bean
public IntegrationFlow flowWithPubSub(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return IntegrationFlow.from(...)
...
.publishSubscribeChannel(pubSub(template, containerFactory),
pubsub -> pubsub
.subscribe(subflow -> ...)
.subscribe(subflow -> ...))
.get();
}
@Bean
public BroadcastCapableChannel pubSub(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return Kafka.publishSubscribeChannel(template, containerFactory, "someTopic2")
.groupId("group2")
.get();
}
@Bean
public IntegrationFlow flowWithPollable(KafkaTemplate<Integer, String> template,
KafkaMessageSource<Integer, String> source) {
return IntegrationFlow.from(...)
...
.channel(Kafka.pollableChannel(template, source, "someTopic3").groupId("group3"))
.handle(..., e -> e.poller(...))
...
.get();
}
/**
* Channel for a single subscriber.
**/
@Bean
SubscribableKafkaChannel pointToPoint(KafkaTemplate<String, String> template,
KafkaListenerContainerFactory<String, String> factory)
SubscribableKafkaChannel channel =
new SubscribableKafkaChannel(template, factory, "topicA");
channel.setGroupId("group1");
return channel;
}
/**
* Channel for multiple subscribers.
**/
@Bean
SubscribableKafkaChannel pubsub(KafkaTemplate<String, String> template,
KafkaListenerContainerFactory<String, String> factory)
SubscribableKafkaChannel channel =
new SubscribableKafkaChannel(template, factory, "topicB", true);
channel.setGroupId("group2");
return channel;
}
/**
* Pollable channel (topic is configured on the source)
**/
@Bean
PollableKafkaChannel pollable(KafkaTemplate<String, String> template,
KafkaMessageSource<String, String> source)
PollableKafkaChannel channel =
new PollableKafkaChannel(template, source);
channel.setGroupId("group3");
return channel;
}
<int-kafka:channel kafka-template="template" id="ptp" topic="ptpTopic" group-id="ptpGroup"
container-factory="containerFactory" />
<int-kafka:pollable-channel kafka-template="template" id="pollable" message-source="source"
group-id = "pollableGroup"/>
<int-kafka:publish-subscribe-channel kafka-template="template" id="pubSub" topic="pubSubTopic"
group-id="pubSubGroup" container-factory="containerFactory" />
消息转换
已提供 StringJsonMessageConverter。
有关更多信息,请参阅 Spring for Apache Kafka 文档。
当使用此转换器与消息驱动通道适配器时,您可以指定要将传入负载转换到的类型。
这是通过在适配器上设置payload-type属性(payloadType属性)来实现的。
以下示例展示了如何在XML配置中执行此操作:
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
channel="nullChannel"
message-converter="messageConverter"
payload-type="com.example.Thing"
error-channel="errorChannel" />
<bean id="messageConverter"
class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>
以下示例展示了如何在 Java 配置中为适配器设置 payload-type 属性(payloadType 属性):
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
kafkaMessageDrivenChannelAdapter.setMessageConverter(converter());
kafkaMessageDrivenChannelAdapter.setPayloadType(Thing.class);
return kafkaMessageDrivenChannelAdapter;
}
空负载与日志压缩‘墓碑’记录
Spring Messaging Message<?> 对象不能有 null 个有效负载。
当您使用 Apache Kafka 的端点时,null 个有效负载(也称为墓碑记录)由类型为 KafkaNull 的有效负载表示。
有关更多信息,请参阅 Spring for Apache Kafka 文档。
Spring Integration 端点的 POJO 方法可以使用真正的 null 值而不是 KafkaNull。
为此,请使用 @Payload(required = false) 标记该参数。
以下示例展示了如何实现:
@ServiceActivator(inputChannel = "fromSomeKafkaInboundEndpoint")
public void in(@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Payload(required = false) Customer customer) {
// customer is null if a tombstone record
...
}
从 Spring Integration 流程中调用KStream
您可以使用 MessagingTransformer 从 KStream 调用集成流:
@Bean
public KStream<byte[], byte[]> kStream(StreamsBuilder kStreamBuilder,
MessagingTransformer<byte[], byte[], byte[]> transformer) transformer) {
KStream<byte[], byte[]> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
stream.mapValues((ValueMapper<byte[], byte[]>) String::toUpperCase)
...
.transform(() -> transformer)
.to(streamingTopic2);
stream.print(Printed.toSysOut());
return stream;
}
@Bean
@DependsOn("flow")
public MessagingTransformer<byte[], byte[], String> transformer(
MessagingFunction function) {
MessagingMessageConverter converter = new MessagingMessageConverter();
converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*"));
return new MessagingTransformer<>(function, converter);
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(MessagingFunction.class)
...
.get();
}
当集成流以接口开始时,创建的代理的名称为流 bean 的名称后附加".gateway",因此该 bean 名称可作为 @Qualifier(如果需要)使用。
读取/处理/写入场景的性能考量
许多应用程序从主题中消费数据,执行一些处理,然后将结果写入另一个主题。在大多数情况下,如果 write 失败,应用程序将希望抛出异常,以便可以重试传入的请求和/或将其发送到死信主题。此功能由底层的消息监听器容器以及适当配置的错误处理程序共同支持。然而,为了支持这一点,我们需要阻塞监听器线程,直到写入操作成功(或失败),以便将任何异常抛出给容器。在消费单条记录时,可通过将出站适配器的 sync 属性设置为相应值来实现。然而,在消费批处理时,使用 sync 会导致显著的性能下降,因为应用程序会在发送下一条消息之前等待每条消息的发送结果。您也可以执行多次发送操作,然后在之后等待这些发送的结果。这是通过向消息处理器添加一个 futuresChannel 来实现的。要启用此功能,请将KafkaIntegrationHeaders.FUTURE_TOKEN添加到出站消息中;随后可将其用于将Future与特定发送的消息进行关联。这是如何使用此功能的一个示例:
@SpringBootApplication
public class FuturesChannelApplication {
public static void main(String[] args) {
SpringApplication.run(FuturesChannelApplication.class, args);
}
@Bean
IntegrationFlow inbound(ConsumerFactory<String, String> consumerFactory, Handler handler) {
return IntegrationFlow.from(Kafka.messageDrivenChannelAdapter(consumerFactory,
ListenerMode.batch, "inTopic"))
.handle(handler)
.get();
}
@Bean
IntegrationFlow outbound(KafkaTemplate<String, String> kafkaTemplate) {
return IntegrationFlow.from(Gate.class)
.enrichHeaders(h -> h
.header(KafkaHeaders.TOPIC, "outTopic")
.headerExpression(KafkaIntegrationHeaders.FUTURE_TOKEN, "headers[id]"))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.futuresChannel("futures"))
.get();
}
@Bean
PollableChannel futures() {
return new QueueChannel();
}
}
@Component
@DependsOn("outbound")
class Handler {
@Autowired
Gate gate;
@Autowired
PollableChannel futures;
public void handle(List<String> input) throws Exception {
System.out.println(input);
input.forEach(str -> this.gate.send(str.toUpperCase()));
for (int i = 0; i < input.size(); i++) {
Message<?> future = this.futures.receive(10000);
((Future<?>) future.getPayload()).get(10, TimeUnit.SECONDS);
}
}
}
interface Gate {
void send(String out);
}