对于最新稳定版本,请使用 Spring Integration 7.0.0spring-doc.cadn.net.cn

文件分流器

文件分拆器在4.1.2版本中加入,其命名空间支持也在4.2版本中加入。 这文件分拆器将文本文件拆分为单独的行,基于BufferedReader.readLine(). 默认情况下,分线器使用迭 代在从文件中读取时,逐行输出。 设置迭 代属性到false它会先将所有行读取到内存中,然后再以消息形式发送。 一个用例可能是你想在发送包含行的消息前检测文件上的I/O错误。 然而,它只适用于相对较短的文件。spring-doc.cadn.net.cn

入站有效载荷可以是文件,字符串(a文件路径),输入流读者. 其他有效载荷类型则保持不变。spring-doc.cadn.net.cn

以下列表展示了配置文件分拆器:spring-doc.cadn.net.cn

@SpringBootApplication
public class FileSplitterApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileSplitterApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileSplitterFlow() {
        return IntegrationFlow
            .from(Files.inboundAdapter(tmpDir.getRoot())
                 .filter(new ChainFileListFilter<File>()
                        .addFilter(new AcceptOnceFileListFilter<>())
                        .addFilter(new ExpressionFileListFilter<>(
                             new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
            .split(Files.splitter()
                     .markers()
                     .charset(StandardCharsets.US_ASCII)
                     .firstLineAsHeader("fileHeader")
                     .applySequence(true))
            .channel(c -> c.queue("fileSplittingResultChannel"))
            .get();
    }

}
@Bean
fun fileSplitterFlow() =
    integrationFlow(
        Files.inboundAdapter(tmpDir.getRoot())
            .filter(
                ChainFileListFilter<File?>()
                    .addFilter(AcceptOnceFileListFilter())
                    .addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
            )
    ) {
        split(
            Files.splitter()
                .markers()
                .charset(StandardCharsets.US_ASCII)
                .firstLineAsHeader("fileHeader")
                .applySequence(true)
        )
        channel { queue("fileSplittingResultChannel") }
    }
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
    FileSplitter splitter = new FileSplitter(true, true);
    splitter.setApplySequence(true);
    splitter.setOutputChannel(outputChannel);
    return splitter;
}
<int-file:splitter id="splitter" (1)
    iterator=""                  (2)
    markers=""                   (3)
    markers-json=""              (4)
    apply-sequence=""            (5)
    requires-reply=""            (6)
    charset=""                   (7)
    first-line-as-header=""      (8)
    input-channel=""             (9)
    output-channel=""            (10)
    send-timeout=""              (11)
    auto-startup=""              (12)
    order=""                     (13)
    phase="" />                  (14)
1 分水机的豆子名字。
2 设置为true(默认情况下)使用迭代器或false在发送行之前将文件加载到内存中。
3 设置为true在文件数据之前和之后发送文件开始和结束的标记信息。 标记是消息,具有FileSplitter.FileMarker有效载荷(其中开始结束马克财产)。 在下游流程中顺序处理文件时,有些行被过滤,可以用标记。 它们使下游处理能够知道文件是否已完全处理完毕。 此外,还有一个file_marker包含开始结束被添加到这些消息中。 这结束标记包含行数。 如果文件是空的,只有开始结束标记物发射为0作为行计数. 默认为false. 什么时候true,应用序列false默认。 参见markers-json(下一个属性)。
4 什么时候标记是真,设为true拥有FileMarker对象可以转换为 JSON 字符串。 (使用SimpleJsonSerializer在下面)。
5 设置为false以禁用包含序列大小序列号消息中的头部。 默认为true除非标记true. 什么时候true标记true,这些标记被包含在测序中。 什么时候true迭 代true序列大小首部设置为0,因为其大小未知。
6 设置为true导致请求回复例外如果文件中没有行,则会被抛弃。 默认为false.
7 将字符集名称设置为读取文本数据时使用字符串负载。 默认是平台字符集。
8 第一行的头部名称作为后续行消息的头部。 自5.0版本起。
9 设置用于向分路器发送消息的输入信道。
10 设置发送消息的输出信道。
11 设置发送超时。 仅在以下情况下适用。输出通道可以阻挡——例如全封队列通道.
12 设置为false禁用在上下文刷新时自动启动分线器的功能。 默认为true.
13 如果输入通道<发布-订阅-频道/>.
14 为分配器设置启动阶段(使用自动启动true).

文件分拆器还能拆分任何基于文本的内容输入流排成线。 从4.3版本开始,当与使用FTP或SFTP流式入站通道适配器或使用使用以下条件的FTP或SFTP出站网关时,选择获取文件时,分流器会自动关闭支持该流的会话,当文件被完全消耗时 有关这些设施的更多信息,请参见FTP流式入站通道适配器SFTP流式入站通道适配器,以及FTP出站网关SFTP出站网关spring-doc.cadn.net.cn

使用 Java 配置时,还提供了一个额外的构造函数,如下示例所示:spring-doc.cadn.net.cn

public FileSplitter(boolean iterator, boolean markers, boolean markersJson)

什么时候markersJson是真的,标记以 JSON 字符串表示(使用SimpleJsonSerializer).spring-doc.cadn.net.cn

5.0 版本引入了firstLineAsHeader选择将第一行内容指定为头部(例如CSV文件中的列名)。 传递给该属性的参数是首行作为头部的名称,在后续行的消息中作为头部。 该行不包含在序列头部(如果应用序列是真的,也不属于行计数FileMarker.END. 注意:从5.5版本开始,lineCount'也作为一个FileHeaders.LINE_COUNT进入FileMarker.END消息,自FileMarker可以序列化为JSON。 如果文件仅包含头部行,则该文件被视为空,因此仅FileMarker实例在分拆过程中会被发射(如果启用了标记——否则则不发送消息)。 默认情况下(如果未设置头部名称),第一行被视为数据,成为第一个发出消息的有效载荷。spring-doc.cadn.net.cn

如果你需要更复杂的头部提取逻辑(不是第一行,不是整行内容,不是某个特定头部等),可以考虑在文件分拆器. 请注意,已移动到头部的行可能会在正常内容处理的下游被过滤。spring-doc.cadn.net.cn

幂零下游处理分拆文件

什么时候应用序列成立时,分线器将行号加到以下SEQUENCE_NUMBER头部(当标记是真的,标记被计为直线)。 线号可以与幂零接收机一起使用,以避免重启后线路的重处理。spring-doc.cadn.net.cn

@Bean
public ConcurrentMetadataStore store() {
    return new ZookeeperMetadataStore();
}

@Bean
public MetadataStoreSelector selector() {
    return new MetadataStoreSelector(
            message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
                    .getAbsolutePath(),
            message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
                    .toString(),
            store())
                    .compareValues(
                            (oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(selector());
}

@Bean
public IntegrationFlow flow() {
    ...
    .split(new FileSplitter())
    ...
    .handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
    ...
}