消息

The Spring Integration Message 是一个通用的数据容器。 任何对象都可以作为载荷提供,并且每个 Message 实例包含标头,其中包含以键值对形式提供的可扩展属性。spring-doc.cadn.net.cn

Message接口

以下代码清单展示了 Message 接口的定义:spring-doc.cadn.net.cn

public interface Message<T> {

    T getPayload();

    MessageHeaders getHeaders();

}

Message接口是API的核心部分。 通过使用通用包装器封装数据,消息系统可以在不理会数据类型的情况下传递这些数据。 随着应用程序发展以支持新类型或当类型本身被修改或扩展时,消息系统不会受到影响。 另一方面,当消息系统的某些组件需要访问关于Message的信息(如元数据)时,这类信息通常可以存储在消息头的元数据中并从中检索。spring-doc.cadn.net.cn

消息头

正如 Spring Integration 允许任何 Object 作为 Message 的载荷一样,它还支持任何 Object 类型作为头值。 事实上,MessageHeaders 类实现了 java.util.Map_ interface,如下类定义所示:spring-doc.cadn.net.cn

public final class MessageHeaders implements Map<String, Object>, Serializable {
  ...
}
尽管 MessageHeaders 类实现了 Map,但它实际上是一个只读实现。 任何尝试在 Map 中 put 值的操作都会导致 UnsupportedOperationException。 同样适用于 removeclear。 由于消息可能被传递给多个消费者,Map 的结构不能被修改。 同样,消息的有效负载 Object 在初始创建后也不能被 set。 然而,标头值本身(或有效负载对象)的可变性被有意留给框架用户自行决定。

作为Map的实现,可以通过调用get(..)并传入头部名称来获取头部信息。 还可以将预期的Class作为一个额外参数提供。 甚至更好的是,在获取预定义值时,方便的方法可以直接使用。 以下示例展示了这三种选项:spring-doc.cadn.net.cn

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();

以下表格描述了预定义的消息头:spring-doc.cadn.net.cn

表 1. 预定义消息标头
Header Name header type 用法
 MessageHeaders.ID
 java.util.UUID

此消息实例的标识符。 每次消息发生变更时都会改变。spring-doc.cadn.net.cn

 MessageHeaders.
TIMESTAMP
 java.lang.Long

消息创建的时间。 每次消息发生变更时都会改变。spring-doc.cadn.net.cn

 MessageHeaders.
REPLY_CHANNEL
 java.lang.Object
(String or
MessageChannel)

一个通道,当没有配置显式的输出通道且不存在 ROUTING_SLIP 或者 ROUTING_SLIP 耗尽时,用于发送回复(如果有)。 如果该值为 String,它必须是一个 bean 名称或由 ChannelRegistry. 生成。spring-doc.cadn.net.cn

 MessageHeaders.
ERROR_CHANNEL
 java.lang.Object
(String or
MessageChannel)

一个错误发送的通道。 如果该值是String,那么它必须表示一个bean名称或由ChannelRegistry.生成。spring-doc.cadn.net.cn

许多内出站适配器实现也提供或期望某些标头,并且你可以配置额外的用户定义标头。 这些标头的常量可以在存在此类标头的模块中找到——例如。AmqpHeadersJmsHeaders,等等。spring-doc.cadn.net.cn

MessageHeaderAccessorAPI

从Spring Framework 4.0和Spring Integration 4.0开始,核心消息抽象已移至spring-messaging模块,并引入了MessageHeaderAccessor API,以提供对消息实现的额外抽象。spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

所有(core)特定于Spring Integration的消息头常量现在在IntegrationMessageHeaderAccessor类中声明。spring-doc.cadn.net.cn

以下表格描述了预定义的消息头:spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

表 2. 预定义消息头
Header Name header type 用法
 IntegrationMessageHeaderAccessor.
CORRELATION_ID
 java.lang.Object

用于关联两个或多个消息。spring-doc.cadn.net.cn

 IntegrationMessageHeaderAccessor.
SEQUENCE_NUMBER
 java.lang.Integer

通常,一个序列号可以与一组带有SEQUENCE_SIZE的消息一起使用,也可以在<resequencer/>中用于重新排序一个无界消息组。spring-doc.cadn.net.cn

 IntegrationMessageHeaderAccessor.
SEQUENCE_SIZE
 java.lang.Integer

消息组中相关消息的数量。spring-doc.cadn.net.cn

 IntegrationMessageHeaderAccessor.
EXPIRATION_DATE
 java.lang.Long

表示消息过期的时间。 不是框架直接使用的,但可以通过头部丰富器设置,并在配置了UnexpiredMessageSelector<filter/>中使用。spring-doc.cadn.net.cn

 IntegrationMessageHeaderAccessor.
PRIORITY
 java.lang.Integer

消息优先级——例如,在PriorityChannelspring-doc.cadn.net.cn

 IntegrationMessageHeaderAccessor.
DUPLICATE_MESSAGE
 java.lang.Boolean

如果消息被幂等接收拦截器检测为重复消息,则返回 true。 请参见 幂等接收企业集成模式spring-doc.cadn.net.cn

 IntegrationMessageHeaderAccessor.
CLOSEABLE_RESOURCE
 java.io.Closeable

此头部仅在消息与应在消息处理完成后关闭的Closeable相关联时出现。 一个例子是使用FTP、SFTP等进行流式文件传输时关联的Sessionspring-doc.cadn.net.cn

 IntegrationMessageHeaderAccessor.
DELIVERY_ATTEMPT
 java.lang.
AtomicInteger

如果消息驱动的通道适配器支持配置RetryTemplate,那么此头部包含当前传递尝试次数。spring-doc.cadn.net.cn

 IntegrationMessageHeaderAccessor.
ACKNOWLEDGMENT_CALLBACK
 o.s.i.support.
Acknowledgment
Callback

如果入站端点支持,则可以回调以接受、拒绝或重新排队消息。 请参见 延迟确认可拉取消息源MQTT 手动确认spring-doc.cadn.net.cn

这些头部的一些方便的类型化获取器在IntegrationMessageHeaderAccessor类上提供,如下例所示:spring-doc.cadn.net.cn

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...

以下表格描述了在 IntegrationMessageHeaderAccessor 中出现但在用户代码中通常不使用的标头(即,它们通常被 Spring Integration 的内部部分使用——在此处包括它们是为了完整性):spring-doc.cadn.net.cn

表 3. 预定义消息头
Header Name header type 用法
 IntegrationMessageHeaderAccessor.
SEQUENCE_DETAILS
 java.util.
List<List<Object>>

当需要嵌套相关性时(例如,splitter→…​→splitter→…​→aggregator→…​→aggregator)使用的相关数据堆栈。spring-doc.cadn.net.cn

 IntegrationMessageHeaderAccessor.
ROUTING_SLIP
 java.util.
Map<List<Object>, Integer>

路由滑槽spring-doc.cadn.net.cn

消息 ID 生成

当消息在应用程序中传输时,每当它被修改(例如,由转换器修改),都会分配一个新的消息ID。 消息ID是一个UUID。 从Spring Integration 3.0开始,默认用于IS生成的策略比之前的java.util.UUID.randomUUID()实现更高效。 它使用基于安全随机种子的简单随机数而不是每次创建一个安全随机数。spring-doc.cadn.net.cn

可以通过在应用程序上下文中声明一个实现UUID生成策略接口的bean来选择不同的UUID生成策略。spring-doc.cadn.net.cn

只有一个 UUID 生成策略可以在一个类加载器中使用。 这意味着如果有两个或多个应用上下文在同一类加载器中运行,它们共享相同的策略。 如果其中一个上下文更改了策略,则所有上下文都将使用该策略。 如果同一类加载器中的两个或多个上下文声明了一个类型为 org.springframework.util.IdGenerator 的 bean,那么它们必须都是同一个类的实例。 否则,尝试替换自定义策略的上下文将无法初始化。 如果策略相同但参数不同,则首先被初始化的上下文中的策略用于所有上下文。

除了默认策略外,还提供了两个额外的IdGeneratorsorg.springframework.util.JdkIdGenerator使用了之前的UUID.randomUUID()机制。 当你不需要UUID而只需要一个简单的递增值时,可以使用o.s.i.support.IdGenerators.SimpleIncrementingIdGeneratorspring-doc.cadn.net.cn

只读标头

MessageHeaders.IDMessageHeaders.TIMESTAMP 是只读标头,无法被覆盖。spring-doc.cadn.net.cn

自版本 4.3.2 起,MessageBuilder 提供了 readOnlyHeaders(String…​ readOnlyHeaders) API,用于自定义不应从上游 Message 复制的标头列表。 默认情况下,只有 MessageHeaders.IDMessageHeaders.TIMESTAMP 是只读的。 全局 spring.integration.readOnly.headers 属性(参见 全局属性)可用于为框架组件自定义 DefaultMessageBuilderFactory。 当您希望不自动填充某些开箱即用的标头(例如由 ObjectToJsonTransformer 提供的 contentType)时,这非常有用(参见 JSON 转换器)。spring-doc.cadn.net.cn

当您尝试使用MessageBuilder构建新消息时,这种头会被忽略,并且会向日志中发出一个特定的INFO消息。spring-doc.cadn.net.cn

从 5.0 版本开始,消息网关头部增强器内容增强器头部过滤器在使用DefaultMessageBuilderFactory时不再允许配置MessageHeaders.IDMessageHeaders.TIMESTAMP头部名称,并会抛出BeanInitializationException异常。spring-doc.cadn.net.cn

标题传播

当消息被消息生成端点(如一个服务激活器)处理和修改时,通常情况下,入站头会被传递到出站消息。 但是有一个例外情况:当一个完全的消息返回给框架时,此时用户代码负责整个出站消息。 在这种情况下,如果入站头部已经在出站消息中存在,则不会进行传递。这样你就可以根据需要修改头部值。 当一个转换器只是返回负载内容时,入站头会被传递。spring-doc.cadn.net.cn

自 Spring 框架 4.3.10 版本起,您可以配置消息处理器(修改消息并生成输出)以抑制特定头部的传播。 要配置您不希望复制的头部,请在 MessageProducingMessageHandler 抽象类上调用 setNotPropagatedHeaders()addNotPropagatedHeaders() 方法。spring-doc.cadn.net.cn

您也可以通过将readOnlyHeaders属性在META-INF/spring.integration.properties中设置为逗号分隔的头部列表,来全局抑制特定消息头的传播。spring-doc.cadn.net.cn

从版本 5.0 开始,setNotPropagatedHeaders()AbstractMessageProducingHandler 上的实现应用简单模式(xxx**xxx*xxx*xxx*yyy),以允许过滤具有公共后缀或前缀的标头。 有关更多信息,请参阅 PatternMatchUtils Javadoc。 当其中一个模式为 *(星号)时,不会传播任何标头。 所有其他模式均被忽略。 在这种情况下,服务激活器的行为与转换器相同,并且任何必需的标头必须包含在服务方法返回的 Message 中。 notPropagatedHeaders() 选项在 Java DSL 的 ConsumerEndpointSpec 中可用。 它也可用于 <service-activator> 组件的 XML 配置,作为 not-propagated-headers 属性。spring-doc.cadn.net.cn

Header propagation suppression 不适用于那些不修改消息的端点,例如 桥接器路由器

消息实现

The base implementation of the Message interface is GenericMessage<T>, and it provides two constructors, shown in the following listing:spring-doc.cadn.net.cn

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)

当创建一个 Message 时,会生成一个随机唯一ID。 接受一个 Map 头部的构造函数会将提供的头部复制到新创建的 Message 中。spring-doc.cadn.net.cn

也有一个方便的实现 Message,用于通信错误条件。 这个实现将一个 Throwable 对象作为其负载,如下例所示:spring-doc.cadn.net.cn

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();

请注意,此实现利用了GenericMessage基类泛型化的事实。因此,正如两个示例所示,在获取Message负载Object时无需进行类型转换。spring-doc.cadn.net.cn

所述的Message类实现是不可变的。 在某些情况下,当不关心可变性且应用程序的设计逻辑能够避免并发修改时,可以使用MutableMessagespring-doc.cadn.net.cn

MessageBuilder辅助类

您可能会注意到Message接口定义了用于获取其负载和标头的方法,但没有提供设置器。这是原因在于,一旦Message被创建,就无法对其进行修改。因此,当一个Message实例被多个消费者接收(例如,通过发布-订阅通道),如果其中某个消费者需要发送带有不同负载类型的回复,则必须创建一个新的Message。因此,其他消费者不会受到这些变化的影响。请记住,多个消费者可能会访问同一个负载实例或头信息值,至于该实例是否本身是不可变的,则由您自行决定。换句话说,`Message` 实例的合约类似于不可修改的 `Collection`,而 `MessageHeaders` 映射进一步举例说明了这一点。尽管 MessageHeaders 类实现了 java.util.Map,但在 MessageHeaders 实例上尝试调用 put 操作(或 'remove' 或 'clear')会导致 UnsupportedOperationExceptionspring-doc.cadn.net.cn

而不是要求创建并填充一个Map来传递给GenericMessage构造函数,Spring Integration确实提供了一种更方便的方式来构建消息:MessageBuilderMessageBuilder提供了两个工厂方法,可以从现有的Message或带有负载Object来创建Message实例。 当从现有Message构建时,该Message的头信息和负载将被复制到新Message中,如以下示例所示:spring-doc.cadn.net.cn

Message<String> message1 = MessageBuilder.withPayload("test")
        .setHeader("foo", "bar")
        .build();

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));

如果需要创建一个带有新负载的Message,但仍希望从现有的Message复制头信息,可以使用其中一个'copy'方法,如下例所示:spring-doc.cadn.net.cn

Message<String> message3 = MessageBuilder.withPayload("test3")
        .copyHeaders(message1.getHeaders())
        .build();

Message<String> message4 = MessageBuilder.withPayload("test4")
        .setHeader("foo", 123)
        .copyHeadersIfAbsent(message1.getHeaders())
        .build();

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));

注意,copyHeadersIfAbsent方法不会覆盖现有值。 此外,在前面的示例中,您可以看到如何使用setHeader设置任何自定义头。 最后,还有set个方法可供设置预定义头,并且有一个非破坏性的方法用于设置任何头(MessageHeaders还为预定义头名称定义了常量)。spring-doc.cadn.net.cn

您也可以使用MessageBuilder来设置消息的优先级,如下例所示:spring-doc.cadn.net.cn

Message<Integer> importantMessage = MessageBuilder.withPayload(99)
        .setPriority(5)
        .build();

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
        .setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
        .build();

assertEquals(2, lessImportantMessage.getHeaders().getPriority());

The priority 头部仅在使用 PriorityChannel 时考虑(如下一章所述)。 它被定义为一个 java.lang.Integerspring-doc.cadn.net.cn

MutableMessageBuilder 是为了处理 MutableMessage 实例。 该类的逻辑是创建一个 MutableMessage 或者保持不变,并通过构建器方法修改其内容。 这样,在消息交换不关心不可变性的情况下,可以在运行应用中获得轻微的性能提升。spring-doc.cadn.net.cn

从6.4版本开始,从MessageBuilder中提取了一个BaseMessageBuilder类以简化默认消息构建逻辑的扩展。 例如,与自定义MessageBuilderFactory一起使用时,可以在应用程序上下文中全局使用一个自定义BaseMessageBuilder实现来提供自定义Message实例。 特别是,可以通过重写GenericMessage.toString()方法在记录此类消息时隐藏payload和headers中的敏感信息。

MessageBuilderFactory抽象

带有 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAMEMessageBuilderFactory Bean 被全局注册到应用程序上下文中,并在框架中用于创建 Message 实例。 默认情况下,它是 DefaultMessageBuilderFactory 的一个实例。 开箱即用,框架还提供了一个 MutableMessageBuilderFactory,以便在框架组件中创建 MutableMessage 实例。 若要自定义 Message 实例的创建方式,必须在目标应用程序上下文中提供一个具有 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAMEMessageBuilderFactory Bean,以覆盖默认实现。 例如,可以注册一个自定义的 MessageBuilderFactory,用于 BaseMessageBuilder 的实现,其中我们希望提供一个 GenericMessage 扩展,通过重写 toString() 来在记录此类消息时隐藏有效负载和标头中的敏感信息。spring-doc.cadn.net.cn

一些快速实现这些类以演示个人可识别信息缓解可以像这样:spring-doc.cadn.net.cn

class PiiMessageBuilderFactory implements MessageBuilderFactory {

	@Override
	public <T> PiiMessageBuilder<T> fromMessage(Message<T> message) {
	    return new PiiMessageBuilder<>(message.getPayload(), message);
	}

	@Override
	public <T> PiiMessageBuilder<T> withPayload(T payload) {
	    return new PiiMessageBuilder<>(payload, null);
	}

}

class PiiMessageBuilder<P> extends BaseMessageBuilder<P, PiiMessageBuilder<P>> {

    public PiiMessageBuilder(P payload, @Nullable Message<P> originalMessage) {
        super(payload, originalMessage);
    }

    @Override
    public Message<P> build() {
        return new PiiMessage<>(getPayload(), getHeaders());
    }

}

class PiiMessage<P> extends GenericMessage<P> {

    @Serial
    private static final long serialVersionUID = -354503673433669578L;

    public PiiMessage(P payload, Map<String, Object> headers) {
        super(payload, headers);
    }

    @Override
    public String toString() {
        return "PiiMessage [payload=" + getPayload() + ", headers=" + maskHeaders(getHeaders()) + ']';
    }

    private static Map<String, Object> maskHeaders(Map<String, Object> headers) {
        return headers.entrySet()
                .stream()
                .map((entry) -> entry.getKey().equals("password") ? Map.entry(entry.getKey(), "******") : entry)
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

}

然后这个PiiMessageBuilderFactory可以注册为一个bean,每当框架记录消息时(例如在errorChannel的情况下),password头部将会被遮掩。spring-doc.cadn.net.cn