|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
SFTP 入站通道适配器
SFTP 入站通道适配器是一种特殊的监听器,它连接到服务器并监听远程目录事件(例如创建新文件),此时它将启动文件传输。 以下示例展示了如何配置 SFTP 入站通道适配器:
<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
session-factory="sftpSessionFactory"
channel="requestChannel"
filename-pattern="*.txt"
remote-directory="/foo/bar"
preserve-timestamp="true"
local-directory="file:target/foo"
auto-create-local-directory="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
delete-remote-files="false">
<int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>
上述配置示例展示了如何为各种属性提供值,包括以下内容:
-
local-directory: 文件将被传输到的位置 -
remote-directory: 要从中传输文件的远程源目录 -
session-factory: 对之前配置的 bean 的引用
默认情况下,传输的文件会保留与原始文件相同的名称。
如果您希望覆盖此行为,可以设置 local-filename-generator-expression 属性,该属性允许您提供一个 SpEL 表达式来生成本地文件的名称。
与出站网关和适配器不同(在那些场景中,SpEL 求值上下文的根对象是 Message),此入站适配器在求值时尚未拥有消息,因为它最终生成的正是以传输文件为有效负载的消息。
因此,SpEL 求值上下文的根对象是远程文件的原始名称(一个 String)。
入站通道适配器首先将文件检索到本地目录,然后根据轮询配置逐个发出每个文件。
从 5.0 版本开始,当需要新文件检索时,您可以限制从 SFTP 服务器获取的文件数量。
当目标文件较大或在具有持久化文件列表过滤器的集群系统中运行时(本节稍后将讨论),这可能会有所帮助。
为此请使用 max-fetch-size。
负值(默认值)表示无限制,所有匹配的文件都将被检索。
有关更多信息,请参阅 入站通道适配器:控制远程文件获取。
自 5.0 版本起,您还可以通过设置 scanner 属性,向 inbound-channel-adapter 提供自定义的 DirectoryScanner 实现。
从 Spring Integration 3.0 开始,您可以指定 preserve-timestamp 属性(默认值为 false)。
当设置为 true 时,本地文件的修改时间戳将被设置为从服务器检索到的值。
否则,它将设置为当前时间。
从版本 4.2 开始,您可以使用 remote-directory-expression 替代 remote-directory,从而在每次轮询时动态确定目录——例如 remote-directory-expression="@myBean.determineRemoteDir()"。
有时,仅通过 filename-pattern 属性指定的简单模式进行文件过滤可能不够用。
如果是这种情况,您可以使用 filename-regex 属性指定一个正则表达式(例如 filename-regex=".*\.test$")。
如果您需要完全的控制权,可以使用 filter 属性提供一个对 org.springframework.integration.file.filters.FileListFilter 自定义实现的引用,org.springframework.integration.file.filters.FileListFilter 是一个用于过滤文件列表的策略接口。
该过滤器决定了哪些远程文件将被获取。
您还可以通过使用 CompositeFileListFilter 将基于模式的过滤器与其他过滤器(例如 AcceptOnceFileListFilter,以避免同步之前已获取的文件)组合起来。
AcceptOnceFileListFilter将其状态存储在内存中。
如果您希望状态在系统重启后仍然存在,请考虑使用SftpPersistentAcceptOnceFileListFilter代替。
此过滤器将接受的文件名存储在一个MetadataStore策略的实例中(请参阅元数据存储)。
此过滤器根据文件名和远程修改时间进行匹配。
自 4.0 版本起,此过滤器需要一个 ConcurrentMetadataStore。
当与共享数据存储(例如使用 Redis 配合 RedisMetadataStore)一起使用时,它允许过滤器键在多个应用程序或服务器实例之间共享。
从 5.0 版本开始,SftpPersistentAcceptOnceFileListFilter 与内存中的 SimpleMetadataStore 默认应用于 SftpInboundFileSynchronizer。
此过滤器也与应用了 XML 配置中的 regex 或 pattern 选项的情况一起使用,同时也可通过 Java DSL 中的 SftpInboundChannelAdapterSpec 进行应用。
您可以通过使用 CompositeFileListFilter(或 ChainFileListFilter)来处理任何其他用例。
上述讨论涉及在检索文件之前对文件进行过滤。 一旦文件被检索,将对文件系统上的文件应用额外的过滤器。 默认情况下,这是一个`AcceptOnceFileListFilter`,如本节所述,它在内存中保留状态且不考虑文件的修改时间。 除非您的应用程序在处理后删除了文件,否则适配器在应用程序重启后将默认重新处理磁盘上的文件。
此外,如果您配置filter使用SftpPersistentAcceptOnceFileListFilter,并且远程文件的时间戳发生变化(导致重新获取),默认的本地过滤器将不允许处理此新文件。
有关此筛选器的更多信息及其用法,请参阅 远程持久文件列表筛选器。
您可以使用 local-filter 属性来配置本地文件系统过滤器的行为。
从版本 4.3.8 开始,默认配置为 FileSystemPersistentAcceptOnceFileListFilter。
该过滤器将接受的文件名和修改时间戳存储在 MetadataStore 策略的实例中(参见 元数据存储),并检测本地文件修改时间的变化。
默认的 MetadataStore 是一个 SimpleMetadataStore,它将状态存储在内存中。
自版本 4.1.5 起,这些过滤器拥有一个名为 flushOnUpdate 的新属性,该属性会导致它们在每次更新时刷新元数据存储(如果该存储实现了 Flushable)。
此外,如果您使用分布式 MetadataStore(例如 Redis 元数据存储),您可以拥有多个相同的适配器或应用程序实例,并确保只有一个实例处理文件。 |
实际的本地过滤器是一个CompositeFileListFilter,它包含提供的过滤器和一个模式过滤器,用于防止处理正在下载过程中的文件(基于temporary-file-suffix)。
文件以下列后缀进行下载(默认为.writing),当传输完成后,文件会被重命名为其最终名称,从而使它们对过滤器“可见”。
有关这些属性的更多详细信息,请参阅模式。
SFTP 入站通道适配器是一个轮询消费者。
因此,您必须配置一个轮询器(可以是全局默认值或本地元素)。
一旦文件被传输到本地目录,就会生成一个负载类型为 java.io.File 的消息,并将其发送到由 channel 属性标识的通道。
从版本 6.2 开始,您可以使用 SftpLastModifiedFileListFilter 基于最后修改策略来过滤 SFTP 文件。
此过滤器可以通过配置 age 属性,使得只有早于该值的文件才会通过过滤器。
默认年龄为 60 秒,但您应选择足够大的年龄值,以避免因网络故障等原因过早捕获文件。
请查阅其 Javadoc 以获取更多信息。
有关文件过滤和大文件的更多信息
有时,刚出现在监控(远程)目录中的文件可能尚未完成。
通常,此类文件会使用某种临时扩展名写入(例如名为 something.txt.writing 的文件使用 .writing 作为扩展名),然后在写入过程完成后重命名。
在大多数情况下,开发者只关心已完成的文件,并希望仅过滤这些文件。
为处理这些场景,您可以使用 filename-pattern、filename-regex 和 filter 属性提供的过滤支持。
如果您需要自定义过滤器实现,可以通过设置 filter 属性在适配器中包含引用。
以下示例展示了如何实现:
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="receiveChannel"
session-factory="sftpSessionFactory"
filter="customFilter"
local-directory="file:/local-test-dir"
remote-directory="/remote-test-dir">
<int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>
<bean id="customFilter" class="org.foo.CustomFilter"/>
从故障中恢复
您应该理解适配器的架构。
文件同步器会获取文件,并且一个 FileReadingMessageSource 会为每个已同步的文件发送一条消息。
正如前文所述,涉及两个过滤器。
filter 属性(及模式)指的是远程(SFTP)文件列表,以避免获取已经获取过的文件。
FileReadingMessageSource 使用 local-filter 来确定哪些文件应作为消息发送。
同步器会列出远程文件并查询其过滤器。
随后进行文件传输。
如果在文件传输过程中发生IO错误,则会将已添加到过滤器中的任何文件移除,以便它们在下次轮询时能够重新获取。
这仅适用于实现了ReversibleFileListFilter的过滤器(例如AcceptOnceFileListFilter)。
如果在同步文件后,下游流在处理文件时发生错误,则不会自动回滚过滤器,因此默认情况下失败的文件不会被重新处理。
如果您希望在失败后重新处理此类文件,可以使用以下类似的配置,以便从过滤器中移除失败的文件:
<int-sftp:inbound-channel-adapter id="sftpAdapter"
session-factory="sftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/sftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-sftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
上述配置适用于任何 ResettableFileListFilter。
从版本 5.0 开始,入站通道适配器可以根据生成的本地文件名在本地构建子目录。
这也可以是一个远程子路径。
为了能够根据层级支持递归读取本地目录以进行修改,现在可以提供一个内部的 FileReadingMessageSource,并使用基于 Files.walk() 算法的新 RecursiveDirectoryScanner。
有关更多信息,请参见 AbstractInboundFileSynchronizingMessageSource.setScanner()。
此外,您现在可以使用 setUseWatchService() 选项将 AbstractInboundFileSynchronizingMessageSource 切换为基于 WatchService 的 DirectoryScanner。
它也已配置为所有 WatchEventType 实例,以便对本地目录中的任何修改做出响应。
前面所示的重处理示例基于 FileReadingMessageSource.WatchServiceDirectoryScanner 的内置功能,该功能在文件从本地目录被删除时(StandardWatchEventKinds.ENTRY_DELETE)会使用 ResettableFileListFilter.remove()。
有关更多信息,请参见 WatchServiceDirectoryScanner。
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置入站适配器的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("sftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序示例展示了如何使用 Java DSL 配置入站适配器:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlow
.from(Sftp.inboundAdapter(this.sftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilenameExpression("#this.toUpperCase() + '.a'")
.localDirectory(new File("sftp-inbound")),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}