|
对于最新稳定版本,请使用 Spring Integration 7.0.0! |
文件聚合器
从5.5版本开始,a文件聚合器引入以覆盖 的另一侧文件分拆器启用START/END标记时的使用场景。为方便起见文件聚合器实现所有三种序列细节策略:
-
这
头部属性相关策略其中FileHeaders.FILENAME属性用于相关性键计算。当标记被启用时文件分拆器它不填充序列详情头部,因为START/END标记消息也包含在序列大小中。 这FileHeaders.FILENAME仍然会为每行发出的,包括START/END标记消息。 -
这
FileMarker释放策略- 检查FileSplitter.FileMarker.Mark.END组内的消息,然后进行比较FileHeaders.LINE_COUNT组大小为减去的头部值2-FileSplitter.FileMarker实例。 它还实现了群体条件提供者联系方式条件提供商用于摘要相关消息处理. 更多信息请参见消息组状态。 -
这
文件聚合消息组处理器只是把它移除了FileSplitter.FileMarker从组中收集消息,收集其余消息到列表负载中进行生成。
以下列表展示了配置文件聚合器:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
return f -> f
.split(Files.splitter()
.markers()
.firstLineAsHeader("firstLine"))
.channel(c -> c.executor(taskExecutor))
.filter(payload -> !(payload instanceof FileSplitter.FileMarker),
e -> e.discardChannel("aggregatorChannel"))
.<String, String>transform(String::toUpperCase)
.channel("aggregatorChannel")
.aggregate(new FileAggregator())
.channel(c -> c.queue("resultChannel"));
}
@Bean
fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) =
integrationFlow {
split(Files.splitter().markers().firstLineAsHeader("firstLine"))
channel { executor(taskExecutor) }
filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") }
transform(String::toUpperCase)
channel("aggregatorChannel")
aggregate(FileAggregator())
channel { queue("resultChannel") }
}
@serviceActivator(inputChannel="toAggregateFile")
@Bean
public AggregatorFactoryBean fileAggregator() {
AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
aggregator.setProcessorBean(new FileAggregator());
aggregator.setOutputChannel(outputChannel);
return aggregator;
}
<int:chain input-channel="input" output-channel="output">
<int-file:splitter markers="true"/>
<int:aggregator>
<bean class="org.springframework.integration.file.aggregator.FileAggregator"/>
</int:aggregator>
</int:chain>
如果 的默认行为文件聚合器不满足目标逻辑,建议配置聚合端点并采用各自策略。 看文件聚合器更多信息请参见JavaDocs。