|
对于最新稳定版本,请使用 Spring Integration 7.0.0! |
消息
春季集成消息是数据的通用容器。任何对象都可以作为有效载荷提供,且消息实例包含包含用户可扩展属性的键值对头部。
这消息接口
以下列表展示了消息接口:
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
这消息接口是API的核心部分。通过将数据封装在通用包装器中,消息系统可以在不了解数据类型的情况下传递数据。随着应用程序不断演进以支持新类型,或者类型本身被修改或扩展,消息系统不会受到影响。另一方面,当消息系统中的某个组件确实需要访问关于消息这类元数据通常可以存储到消息头部的元数据中,并从中检索。
消息头
就像 Spring Integration 允许任何对象被用作 的有效载荷消息,它也支持任何对象类型作为头部值。事实上,消息头类实现了java.util.Map_接口,如下类定义所示:
public final class MessageHeaders implements Map<String, Object>, Serializable {
...
}
尽管消息头类实现地图,实际上是一种只读实现。任何尝试放映射中的一个值得到未支持作异常. 同样的情况也适用于删除和Clear. 由于消息可以传递给多个消费者,因此地图不可修改。同样,消息的有效载荷对象不可能设置在初始创建之后。然而,头部值本身(或有效载荷对象)的可变性被有意留给框架用户自行决定。 |
作为地图,这些头部可以通过调用去(..)并附上头部名称。或者,你也可以提供预期值类作为一个额外的参数。更好的是,在获取预定义值之一时,可以使用方便的获取器。以下示例展示了这三种选项中的每一个:
Object someValue = message.getHeaders().get("someKey");
CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);
Long timestamp = message.getHeaders().getTimestamp();
下表描述了预定义的消息头:
| 头部名称 | 头部类型 | 用法 |
|---|---|---|
MessageHeaders.ID |
java.util.UUID |
该消息实例的标识符。每次消息变异时都会变化。 |
MessageHeaders. TIMESTAMP |
java.lang.Long |
消息创建的时间。每次消息变异时都会变化。 |
MessageHeaders. REPLY_CHANNEL |
java.lang.Object (String or MessageChannel) |
当没有明确的输出通道且没有明确的输出通道时,会发送回复(如果有的话) |
MessageHeaders. ERROR_CHANNEL |
java.lang.Object (String or MessageChannel) |
错误被发送到的通道。如果值是 |
许多入站和出站适配器实现也提供或期望某些头部,你可以配置额外的用户自定义头部。这些头部的常量可以在那些存在此类头部的模块中找到——例如。Amqp头部,JmsHeaders,依此类推。
消息头访问器应用程序接口
从 Spring Framework 4.0 和 Spring Integration 4.0 开始,核心消息抽象已被迁移到春季消息模,以及消息头访问器API的引入旨在对消息实现提供额外的抽象。所有(核心)Spring Integration专用的消息头部常量现在都声明为集成消息头访问器类。 下表描述了预定义的消息头:
| 头部名称 | 头部类型 | 用法 |
|---|---|---|
IntegrationMessageHeaderAccessor. CORRELATION_ID |
java.lang.Object |
用来关联两条或更多消息。 |
IntegrationMessageHeaderAccessor. SEQUENCE_NUMBER |
java.lang.Integer |
通常是一个序列号,包含一组消息,其中 |
IntegrationMessageHeaderAccessor. SEQUENCE_SIZE |
java.lang.Integer |
一组相关消息中的消息数量。 |
IntegrationMessageHeaderAccessor. EXPIRATION_DATE |
java.lang.Long |
表示消息过期。框架不直接使用,但可用报头丰富器设置,并用于 |
IntegrationMessageHeaderAccessor. PRIORITY |
java.lang.Integer |
消息优先级——例如,在 |
IntegrationMessageHeaderAccessor. DUPLICATE_MESSAGE |
java.lang.Boolean |
如果消息被幂等接收器拦截器检测为重复,则为真。参见幂零接收机企业集成模式。 |
IntegrationMessageHeaderAccessor. CLOSEABLE_RESOURCE |
java.io.Closeable |
如果消息与 |
IntegrationMessageHeaderAccessor. DELIVERY_ATTEMPT |
java.lang. AtomicInteger |
如果消息驱动的通道适配器支持以下配置 |
IntegrationMessageHeaderAccessor. ACKNOWLEDGMENT_CALLBACK |
o.s.i.support. Acknowledgment Callback |
如果入站端点支持,则回调以接受、拒绝或重新排队消息。参见延迟确认可轮询消息源和MQTT手动确认。 |
部分头部提供了方便的类型获取器集成消息头访问器类,如下示例所示:
IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...
下表描述了也出现在集成消息头访问器但通常不被用户代码使用(即通常被 Spring 集成的内部部分使用——这里包含它们是为了完整性):
| 头部名称 | 头部类型 | 用法 |
|---|---|---|
IntegrationMessageHeaderAccessor. SEQUENCE_DETAILS |
java.util. List<List<Object>> |
当需要嵌套相关时使用一组相关性数据(例如, |
IntegrationMessageHeaderAccessor. ROUTING_SLIP |
java.util. Map<List<Object>, Integer> |
参见路由单。 |
消息ID生成
当消息通过应用程序时,每次发生变异(例如,
通过变换器分配一个新的消息ID。
消息ID是UUID.
从 Spring Integration 3.0 开始,IS生成的默认策略比之前更高效java.util.UUID.randomUUID()实现。
它使用基于安全随机种子的简单随机数,而不是每次都生成安全随机数。
通过声明实现 的豆子,可以选择不同的 UUID 生成策略org.springframework.util.IdGenerator在应用的语境下。
类加载器中只能使用一种 UUID 生成策略。
这意味着,如果两个或多个应用上下文在同一类加载器中运行,它们共享相同的策略。
如果某个情境改变了策略,所有情境都会采用该策略。
如果同一个类加载程序中的两个或多个上下文声明了 的豆子org.springframework.util.IdGenerator它们必须都是同一类的实例。
否则,尝试替换自定义策略的上下文将无法初始化。
如果策略相同但参数化,则使用第一个初始化上下文中的策略。 |
除了默认策略外,还有两个额外策略IdGenerators提供。org.springframework.util.JdkIdGenerator使用前述UUID.randomUUID()机制。
你可以使用o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator当实际上不需要UUID,简单的递增值即可时。
只读头
这MessageHeaders.ID和消息头部.时间戳是只读头,无法覆盖。
自4.3.2版本起,消息构建器提供readOnlyHeaders(String...仅读标题)API,用于自定义不应从上游复制的头部列表消息.
只有MessageHeaders.ID和消息头部.时间戳默认只读。
全球spring.integration.readOnly.headers提供属性(见全局属性)以实现自定义默认信息建设工厂用于框架组件。
当你不想填充一些开箱即用的头部时,这非常有用,比如内容类型由ObjectToJsonTransformer(参见 JSON 变换器)
当你尝试用消息构建器,这种类型的头部被忽略,且信息消息会发送到日志。
头部传播
当消息被消息生成端点(如服务激活器)处理(和修改)时,通常会将入站头传播到出站消息。 一个例外是变换器,当完整消息返回到框架时。 在这种情况下,用户代码负责整个出站消息。 当变换器仅返回有效载荷时,入站头部会被传播。 此外,只有当该头部在外发消息中尚未存在时才会被传播,这样你就可以根据需要更改头部值。
从4.3.10版本开始,你可以配置消息处理程序(负责修改消息并产生输出)来抑制特定头部的传播。
要配置你不想被复制的头部,请调用setNotPropagatedHeaders()或addNotPropagatedHeaders()方法消息生产信息处理器抽象类。
你还可以通过设置仅读首部属性META-INF/spring.integration.properties转换为逗号分隔的首部列表。
从5.0版本开始,setNotPropagatedHeaders()在摘要消息制作处理程序应用简单模式(xxx*,xxx,*xxx或xxx*yyy)以便允许带有常见后缀或前缀的过滤头部。 看PatternMatchUtilsJavadoc更多信息。当其中一个模式为(星号)时,不会传播任何头部。其他所有模式被忽略。在这种情况下,服务激活器的行为与转换器相同,任何必要的头部必须在*消息服役回来了。 这notPropagatedHeaders()选项可在消费者端点规格适用于Java DSL它也可用于<服务激活器>作为非传播头部属性。
消息实现
基础实现消息接口是通用信息<T>,并且提供了两个构造子,如下列表所示:
new GenericMessage<T>(T payload);
new GenericMessage<T>(T payload, Map<String, Object> headers)
当消息生成一个随机的唯一ID。接受地图Of头部将提供的头部复制到新创建的消息.
还有一个方便的实现消息设计用于传达错误条件。该实现需要可投掷对象作为其有效载荷,如下示例所示:
ErrorMessage message = new ErrorMessage(someThrowable);
Throwable t = message.getPayload();
注意,这种实现利用了以下事实通用消息基类是参数化的。因此,如两个例子所示,获取消息有效载荷对象.
这消息构建器辅助类
你可能会注意到消息接口定义了其有效载荷和头部的检索方法,但不提供设置器。原因在于消息在初始创建后无法修改。因此,当消息实例发送给多个消费者(例如,通过发布-订阅通道),如果其中一个消费者需要发送不同类型的回复,必须创建一个新的消息. 因此,其他消费者不会受到这些变化的影响。请记住,多个消费者可能访问相同的负载实例或头部值,而该实例是否不可变则由你自行决定。换句话说,的契约消息实例数类似于不可修改的实例收集,以及消息头地图进一步说明了这一点。尽管消息头类实现java.util.Map,任何试图调用放对 a 进行作(或“移除”或“清除”)。消息头实例结果为未支持作异常.
Spring Integration 无需创建和生成映射才能传递到 GenericMessage 构造器,而是提供了一种更便捷的消息构建方式:消息构建器. 这消息构建器提供两种工厂方法用于创建消息来自现有消息或者与有效载荷一起对象. 从现有架构基础上构建消息,即该的头部和有效载荷消息被复制到新的消息,如下示例所示:
Message<String> message1 = MessageBuilder.withPayload("test")
.setHeader("foo", "bar")
.build();
Message<String> message2 = MessageBuilder.fromMessage(message1).build();
assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));
如果你需要创建一个消息带着新的有效载荷,但仍想从现有负载复制头部消息你可以使用其中一种“复制”方法,如下示例所示:
Message<String> message3 = MessageBuilder.withPayload("test3")
.copyHeaders(message1.getHeaders())
.build();
Message<String> message4 = MessageBuilder.withPayload("test4")
.setHeader("foo", 123)
.copyHeadersIfAbsent(message1.getHeaders())
.build();
assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));
注意copyHeadersIfAbsent方法不会覆盖已有的值。此外,在前面的示例中,你可以看到如何设置任何用户自定义的头部setHeader. 最后,还有设置为预定义的头部提供方法,以及用于设置任意头部的非破坏性方法(消息头还为预定义的头部名称定义常量)。
你也可以使用消息构建器如下示例所示,以设置消息优先级:
Message<Integer> importantMessage = MessageBuilder.withPayload(99)
.setPriority(5)
.build();
assertEquals(5, importantMessage.getHeaders().getPriority());
Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
.setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
.build();
assertEquals(2, lessImportantMessage.getHeaders().getPriority());
这优先权只有在使用 a 时才考虑优先频道(如下一章所述)。它被定义为java.lang.整数.