|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
文件分割器
FileSplitter 在版本 4.1.2 中引入,其命名空间支持在版本 4.2 中添加。
FileSplitter 基于 BufferedReader.readLine() 将文本文件拆分为单独的行。
默认情况下,拆分器使用 Iterator 按顺序逐行发射从文件中读取的行。
将 iterator 属性设置为 false 会导致它在发射消息之前将所有行读入内存。
这种用法的场景可能是:如果您希望在发送包含行的任何消息之前检测文件上的 I/O 错误。
然而,它仅适用于相对较短的文件。
入站负载可以是File、String(一个File路径)、InputStream或Reader。
其他类型的负载将按原样发出。
以下列表展示了配置 FileSplitter 的可能方式:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@SpringBootApplication
public class FileSplitterApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}
}
@Bean
fun fileSplitterFlow() =
integrationFlow(
Files.inboundAdapter(tmpDir.getRoot())
.filter(
ChainFileListFilter<File?>()
.addFilter(AcceptOnceFileListFilter())
.addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
)
) {
split(
Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true)
)
channel { queue("fileSplittingResultChannel") }
}
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
FileSplitter splitter = new FileSplitter(true, true);
splitter.setApplySequence(true);
splitter.setOutputChannel(outputChannel);
return splitter;
}
<int-file:splitter id="splitter" (1)
iterator="" (2)
markers="" (3)
markers-json="" (4)
apply-sequence="" (5)
requires-reply="" (6)
charset="" (7)
first-line-as-header="" (8)
input-channel="" (9)
output-channel="" (10)
send-timeout="" (11)
auto-startup="" (12)
order="" (13)
phase="" /> (14)
| 1 | 分裂器的 Bean 名称。 |
| 2 | 设置为 true(默认值)以使用迭代器,或设置为 false 在发送行之前将文件加载到内存中。 |
| 3 | 设置为 true 可在文件数据之前和之后发出文件开始和结束标记消息。
标记是负载为 FileSplitter.FileMarker 的消息(其中 mark 属性包含 START 和 END 值)。
当在下游流程中顺序处理文件且某些行被过滤时,您可能需要使用这些标记。
它们使下游处理能够知道文件是否已完全处理。
此外,还会向这些消息添加一个包含 START 或 END 的 file_marker 头信息。
END 标记包含行数计数。
如果文件为空,则仅发出 START 和 END 标记,并将 0 作为 lineCount。
默认值为 false。
当 true 时,apply-sequence 默认为 false。
另请参阅 markers-json(下一个属性)。 |
| 4 | 当markers为true时,将此设置为true,以便将FileMarker对象转换为JSON字符串。
(底层使用SimpleJsonSerializer)。 |
| 5 | 设置为 false 可禁用在消息中包含 sequenceSize 和 sequenceNumber 头信息。
默认值为 true,除非 markers 为 true。
当 true 和 markers 为 true 时,标记将包含在序列中。
当 true 和 iterator 为 true 时,sequenceSize 头信息将被设置为 0,因为大小未知。 |
| 6 | 设置为 true 会在文件中没有行时抛出 RequiresReplyException。
默认值为 false。 |
| 7 | 设置用于将文本数据读取到String有效载荷时的字符集名称。
默认值为平台字符集。 |
| 8 | 作为消息中剩余行携带的标头的第一个行的标头名称。 自 5.0 版本起。 |
| 9 | 设置用于向拆分器发送消息的输入通道。 |
| 10 | 设置发送消息的输出通道。 |
| 11 | 设置发送超时时间。
仅当 output-channel 可能阻塞时适用,例如 QueueChannel 已满时。 |
| 12 | 设置为 false 可禁用在上下文刷新时自动启动分割器。
默认值为 true。 |
| 13 | 如果 input-channel 是 <publish-subscribe-channel/>,请设置此端点的顺序。 |
| 14 | 设置分裂器的启动阶段(当 auto-startup 为 true 时使用)。 |
The FileSplitter 还会将任何基于文本的 InputStream 拆分为行。
从 4.3 版本开始,当与使用 stream 选项检索文件的 FTP 或 SFTP 流式传输入站通道适配器或 FTP 或 SFTP 出站网关结合使用时,拆分器会在文件完全消耗后自动关闭支持该流的会话。
有关这些功能的更多信息,请参阅 FTP 流式传输入站通道适配器、SFTP 流式传输入站通道适配器,以及 FTP 出站网关 和 SFTP 出站网关。
使用 Java 配置时,可以使用额外的构造函数,如下例所示:
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
当 markersJson 为 true 时,标记将表示为 JSON 字符串(使用 SimpleJsonSerializer)。
版本 5.0 引入了 firstLineAsHeader 选项,用于指定内容的第一行为标题(例如 CSV 文件中的列名)。
传递给此属性的参数是:在发出的消息中,第一行将作为标题被携带的标题名称,该标题适用于剩余的行。
该行不会包含在序列标题中(如果 applySequence 为 true),也不会包含在与 FileMarker.END 关联的 lineCount 中。
注意:从版本 5.5 开始,lineCount 也会作为 FileHeaders.LINE_COUNT 包含在 FileMarker.END 消息的标题中,因为 FileMarker 可能会被序列化为 JSON。
如果文件仅包含标题行,则该文件被视为空,因此在拆分时只会发出 FileMarker 个实例(如果启用了标记符;否则,不会发出任何消息)。
默认情况下(如果未设置标题名称),第一行被视为数据,并成为第一个发出消息的有效负载(payload)。
如果您需要从文件内容中提取更复杂的逻辑(不是第一行,不是整行内容,也不是某个特定的头信息,等等),请考虑在 FileSplitter 之前使用 头部增强器。
请注意,已移动到头部的行可能会在正常内容处理流程的下游被过滤掉。
拆分文件的幂等下游处理
当 apply-sequence 为 true 时,分割器会在 SEQUENCE_NUMBER 头部添加行号(当 markers 为 true 时,标记被计为行)。
行号可与 幂等接收器 配合使用,以避免在重启后重新处理行。
例如:
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}