|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
FTP 入站通道适配器
FTP 入站通道适配器是一个特殊的监听器,它连接到 FTP 服务器并监听远程目录事件(例如新文件创建),此时它将启动文件传输。
以下示例展示了如何配置一个 inbound-channel-adapter:
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
auto-create-local-directory="true"
delete-remote-files="true"
filename-pattern="*.txt"
remote-directory="some/remote/path"
remote-file-separator="/"
preserve-timestamp="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
local-directory=".">
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
如前所述配置所示,您可以通过使用 inbound-channel-adapter 元素来配置 FTP 入站通道适配器,同时为各种属性提供值,例如 local-directory、filename-pattern(基于简单模式匹配,而非正则表达式),以及对 session-factory 的引用。
默认情况下,传输的文件会保留与原始文件相同的名称。
如果您想覆盖此行为,可以设置 local-filename-generator-expression 属性,该属性允许您提供一个 SpEL 表达式来生成本地文件的名称。
与出站网关和适配器不同(在这些场景中,SpEL 求值上下文的根对象是 Message),此入站适配器在求值时尚未拥有消息,因为它最终会使用传输的文件作为有效负载来生成消息。
因此,SpEL 求值上下文的根对象是远程文件的原始名称(一个 String)。
入站通道适配器首先为本地目录检索 File 对象,然后根据轮询配置发出每个文件。
从 5.0 版本开始,当需要获取新文件时,您可以限制从 FTP 服务器获取的文件数量。
当目标文件非常大或在具有持久化文件列表过滤器(稍后讨论)的集群系统中运行时,这非常有益。
请使用 max-fetch-size 实现此目的。
负值(默认值)表示没有限制,将检索所有匹配的文件。
有关更多信息,请参阅 入站通道适配器:控制远程文件获取。
自 5.0 版本起,您还可以通过设置 scanner 属性,向 inbound-channel-adapter 提供自定义的 DirectoryScanner 实现。
从 Spring Integration 3.0 开始,您可以指定 preserve-timestamp 属性(其默认值为 false)。
当设置为 true 时,本地文件的修改时间戳将被设置为从服务器检索到的值。
否则,它将设置为当前时间。
从版本 4.2 开始,您可以指定 remote-directory-expression 代替 remote-directory,从而在每次轮询时动态确定目录——例如 remote-directory-expression="@myBean.determineRemoteDir()"。
从 4.3 版本开始,您可以省略 remote-directory 和 remote-directory-expression 属性。
它们默认值为 null。
在这种情况下,根据 FTP 协议,客户端工作目录将用作默认远程目录。
有时,基于使用 filename-pattern 属性指定的简单模式进行的文件过滤可能不够用。
如果是这种情况,您可以使用 filename-regex 属性来指定正则表达式(例如 filename-regex=".*\.test$")。
此外,如果您需要完全的控制权,可以使用 filter 属性并提供对任何自定义实现的 o.s.i.file.filters.FileListFilter 的引用,这是一个用于过滤文件列表的策略接口。
此过滤器决定哪些远程文件会被检索。
您还可以通过使用 CompositeFileListFilter 将基于模式的过滤器与其他过滤器(例如用于避免同步先前已获取文件的 AcceptOnceFileListFilter)组合起来。
AcceptOnceFileListFilter将其状态存储在内存中。
如果您希望状态在系统重启后仍然存在,请考虑使用FtpPersistentAcceptOnceFileListFilter代替。
此过滤器将接受的文件名存储在一个MetadataStore策略的实例中(请参阅元数据存储)。
此过滤器根据文件名和远程修改时间进行匹配。
自版本 4.0 起,此过滤器需要一个 ConcurrentMetadataStore。
当与共享数据存储(例如使用 RedisMetadataStore 的 Redis)配合使用时,它允许过滤器键在多个应用程序或服务器实例之间共享。
从版本 5.0 开始,FtpPersistentAcceptOnceFileListFilter 用于内存中的 SimpleMetadataStore 默认应用于 FtpInboundFileSynchronizer。
该过滤器也适用于 XML 配置中的 regex 或 pattern 选项,以及 Java DSL 中的 FtpInboundChannelAdapterSpec。
任何其他用例都可以通过 CompositeFileListFilter(或 ChainFileListFilter)进行管理。
前面的讨论涉及在检索文件之前进行过滤。
一旦文件被检索,将对文件系统上的文件应用额外的过滤器。
默认情况下,这是一个AcceptOnceFileListFilter,如前所述,它在内存中保留状态且不考虑文件的修改时间。
除非您的应用程序在处理后删除文件,否则适配器将在应用程序重启后默认重新处理磁盘上的文件。
此外,如果您配置filter使用FtpPersistentAcceptOnceFileListFilter,并且远程文件的时间戳发生变化(导致重新获取该文件),默认本地过滤器将不允许处理此新文件。
有关此筛选器的更多信息及其用法,请参阅 远程持久文件列表筛选器。
您可以使用 local-filter 属性来配置本地文件系统过滤器的行为。
从 4.3.8 版本开始,默认配置为 FileSystemPersistentAcceptOnceFileListFilter。
该过滤器将接受的文件名和修改时间戳存储在 MetadataStore 策略的实例中(请参阅 元数据存储),并检测本地文件修改时间的变化。
默认的 MetadataStore 是 SimpleMetadataStore,它在内存中存储状态。
自 4.1.5 版本起,这些过滤器新增了一个属性(flushOnUpdate),该属性会导致它们在每次更新时刷新元数据存储(如果该存储实现了 Flushable)。
此外,如果您使用分布式MetadataStore(例如Redis),您可以拥有多个相同的适配器或应用程序实例,并确保每个文件仅被处理一次。 |
实际的本地过滤器是一个包含所提供过滤器的CompositeFileListFilter,以及一个模式过滤器,用于防止处理正在下载的文件(基于temporary-file-suffix)。
文件使用此后缀进行下载(默认为.writing),当传输完成时,文件会被重命名为其最终名称,从而对过滤器变为‘可见’。
remote-file-separator 属性允许您配置一个文件分隔符字符,以便在默认值 '/' 不适用于您的特定环境时使用。
有关这些属性的更多详细信息,请参阅 schema。
您还应理解,FTP 入站通道适配器是一个轮询消费者。
因此,您必须配置一个轮询器(通过使用全局默认值或局部子元素)。
一旦文件传输完成,就会生成一条以其负载为 java.io.File 的消息,并发送到由 channel 属性标识的通道中。
从 6.2 版本开始,您可以使用 FtpLastModifiedFileListFilter 基于最后修改策略过滤 FTP 文件。
此过滤器可通过配置 age 属性来设置,使得仅传递早于该值的文件。
默认年龄为 60 秒,但您应选择足够大的年龄值,以避免因网络波动等原因过早捕获文件。
请查阅其 Javadoc 以获取更多信息。
有关文件过滤和未完成文件的更多信息
有时,刚刚出现在监控(远程)目录中的文件尚不完整。
通常,此类文件会先使用临时扩展名(例如 somefile.txt.writing)进行写入,待写入过程完成后才会重命名。
在大多数情况下,您只关心完整的文件,并希望仅筛选出已完成的文件。
为了处理这些场景,您可以使用由 filename-pattern、filename-regex 和 filter 属性提供的过滤支持。
以下示例使用了自定义过滤器实现:
<int-ftp:inbound-channel-adapter
channel="ftpChannel"
session-factory="ftpSessionFactory"
filter="customFilter"
local-directory="file:/my_transfers">
remote-directory="some/remote/path"
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
<bean id="customFilter" class="org.example.CustomFilter"/>
Inbound FTP 适配器轮询配置说明
入站 FTP 适配器的职责包括两项任务:
-
与远程服务器通信,以便将文件从远程目录传输到本地目录。
-
对于每个传输的文件,生成一个以该文件为负载的消息,并将其发送到由 'channel' 属性标识的通道。 这就是它们被称为“通道适配器”(channel adapters)而不是仅仅称为“适配器”的原因。 此类适配器的主要工作是生成要发送到消息通道的消息。 本质上,第二个任务优先执行:如果本地目录中已经存在一个或多个文件,它会首先生成这些文件的消息。 只有当所有本地文件都处理完毕后,它才会发起远程通信以获取更多文件。
此外,在配置轮询器的触发器时,您应密切关注 max-messages-per-poll 属性。
其默认值为 1,适用于所有 SourcePollingChannelAdapter 实例(包括 FTP)。
这意味着,一旦处理完一个文件,它就会等待由您的触发器配置确定的下一次执行时间。
如果您恰好有在一个或多个文件驻留在 local-directory 中,那么它会在与远程 FTP 服务器建立通信之前先处理这些文件。
另外,如果 max-messages-per-poll 设置为 1(默认值),则它每次只处理一个文件,间隔时间由您的触发器定义,本质上相当于“一次轮询 === 一个文件”。
对于典型的文件传输用例,您最可能希望具有相反的行为:即针对每次轮询处理尽可能多的文件,然后才等待下一次轮询。
如果是这种情况,请将 max-messages-per-poll 设置为 -1。
这样,在每次轮询时,适配器将尝试生成尽可能多的消息。
换句话说,它会先处理本地目录中的所有文件,然后连接到远程目录,将那里可用的所有文件传输到本地进行处理。
只有完成这些操作后,轮询操作才被视为完成,轮询器才会等待下一次执行时间。
您也可以将'max-messages-per-poll'值设置为一个正数,该值表示每次轮询时从文件中创建的消息数量的上限。
例如,值为10意味着在每次轮询时,它尝试处理不超过十个文件。
从故障中恢复
理解适配器的架构非常重要。
存在一个文件同步器用于获取文件,以及一个FileReadingMessageSource为每个同步后的文件发送消息。
如前所述,涉及两个过滤器。
filter属性(及模式)引用远程(FTP)文件列表,以避免获取已获取的文件。
local-filter被FileReadingMessageSource用来确定哪些文件应作为消息发送。
同步器列出远程文件并查询其过滤器。
随后传输这些文件。
如果在文件传输过程中发生 IO 错误,则已添加到过滤器的任何文件都将被移除,以便在下一次轮询时能够重新获取。
这仅适用于实现了 ReversibleFileListFilter 的过滤器(例如 AcceptOnceFileListFilter)。
如果在同步文件后,下游流在处理文件时发生错误,则不会自动回滚过滤器,因此默认情况下失败的文件不会被重新处理。
如果您希望在失败后重新处理此类文件,可以使用类似于以下的配置,以便从过滤器中移除失败的文件:
<int-ftp:inbound-channel-adapter id="ftpAdapter"
session-factory="ftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/ftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-ftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
上述配置适用于任何 ResettableFileListFilter。
从版本 5.0 开始,入站通道适配器可以在本地构建与生成的本地文件名相对应的子目录。
这也可以是一个远程子路径。
为了能够根据层级支持递归读取本地目录以进行修改,您现在可以提供一个内部的 FileReadingMessageSource,并基于 Files.walk() 算法使用新的 RecursiveDirectoryScanner。
有关更多信息,请参阅 AbstractInboundFileSynchronizingMessageSource.setScanner()。
此外,您现在可以通过使用 setUseWatchService() 选项将 AbstractInboundFileSynchronizingMessageSource 切换为基于 WatchService 的 DirectoryScanner。
所有 WatchEventType 实例也已配置为响应本地目录中的任何修改。
前面所示的重处理示例是基于 FileReadingMessageSource.WatchServiceDirectoryScanner 的内置功能,当文件从本地目录被删除(StandardWatchEventKinds.ENTRY_DELETE)时执行 ResettableFileListFilter.remove()。
有关更多信息,请参阅 WatchServiceDirectoryScanner。
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置来配置入站适配器:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序示例展示了如何使用 Java DSL 配置入站适配器:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlow
.from(Ftp.inboundAdapter(this.ftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilename(f -> f.toUpperCase() + ".a")
.localDirectory(new File("d:\\ftp_files")),
e -> e.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}