|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
入站通道适配器:轮询多个服务器和目录
从版本 5.0.7 开始,RotatingServerAdvice 已可用;当配置为轮询建议时,入站适配器可以轮询多个服务器和目录。
像往常一样配置该建议并将其添加到轮询器的建议链中。
DelegatingSessionFactory 用于选择服务器,有关更多信息请参阅 委托会话工厂。
该建议配置由一个 RotationPolicy.KeyDirectory 对象列表组成。
@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
此建议将轮询服务器 foo 上的目录 foo,直到不存在新文件,然后移动到目录 bar,接着移动到服务器 two 上的目录 baz,依此类推。
此默认行为可以通过 fair 构造函数参数进行修改:
@Bean
public RotatingServerAdvice advice() {
...
return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}
在这种情况下,无论前一次轮询是否返回文件,建议都会移动到下一个服务器/目录。
或者,您可以提供自己的 RotationPolicy 以根据需要重新配置消息源:
public interface RotationPolicy {
void beforeReceive(MessageSource<?> source);
void afterReceive(boolean messageReceived, MessageSource<?> source);
}
和
@Bean
public RotatingServerAdvice advice() {
return new RotatingServerAdvice(myRotationPolicy());
}
local-filename-generator-expression 属性(在同步器上为 localFilenameGeneratorExpression)现在可以包含 #remoteDirectory 变量。
这允许从不同目录检索的文件下载到本地类似的目录中:
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(Sftp.inboundAdapter(sf())
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localDirectory(new File(tmpDir))
.localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();
}
使用此建议时,请勿在轮询器上配置TaskExecutor;有关更多信息,请参阅消息源的有条件轮询器。 |
当并非所有获取的文件都在单个轮询周期内处理完毕,但文件可能被轮换到另一个时,还可以查看便捷的 AbstractRemoteFileStreamingMessageSource.clearFetchedCache() API。
注意:原文中SessionFactory可能指代具体配置项或版本号,此处保留原样;若需更精准翻译,建议结合上下文确认AbstractRemoteFileStreamingMessageSource.clearFetchedCache()和SessionFactory的具体含义。