|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
读取文件
可以使用 FileReadingMessageSource 从文件系统消费文件。
这是 MessageSource 的一个实现,用于从文件系统目录创建消息。
以下示例展示了如何配置 FileReadingMessageSource:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:directory="${input.directory}"/>
为防止为特定文件创建消息,您可以提供 FileListFilter。
默认情况下,我们使用以下过滤器:
-
IgnoreHiddenFileListFilter -
AcceptOnceFileListFilter
IgnoreHiddenFileListFilter 确保不处理隐藏文件。
请注意,隐藏的确切定义取决于系统。
例如,在基于 UNIX 的系统中,以点号开头的文件被视为隐藏文件。
而 Microsoft Windows 则使用专用的文件属性来标识隐藏文件。
|
版本 4.2 引入了 |
AcceptOnceFileListFilter 确保文件仅从目录中选取一次。
|
自 4.0 版本起,此过滤器需要一个 自 4.1.5 版本起,此过滤器新增了一个属性( |
持久化文件列表过滤器现在有一个布尔属性 forRecursion。
将此属性设置为 true,也会设置 alwaysAcceptDirectories,这意味着对外部网关(ls 和 mget)的递归操作现在将每次遍历完整的目录树。
这是为了解决目录树深处更改未被检测到的问题。
此外,forRecursion=true 会导致使用文件的完整路径作为元数据存储键;这解决了如果同一名称的文件出现在不同目录中多次时过滤器无法正常工作的问题。
重要提示:这意味着持久化元数据存储中的现有键将无法在顶层目录下的文件中找到。
因此,该属性默认值为 false;此行为可能在未来的版本中发生变化。
以下示例使用过滤器配置了 FileReadingMessageSource:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
读取文件时的一个常见问题是,文件可能在尚未准备好时就被检测到(即其他进程可能仍在写入该文件)。
默认的 AcceptOnceFileListFilter 无法防止这种情况。
在大多数情况下,如果文件写入过程在文件准备好后立即重命名每个文件,则可以避免此问题。
组合使用默认值 AcceptOnceFileListFilter 与仅接受已就绪文件(例如基于已知后缀)的 filename-pattern 或 filename-regex 过滤器,可以应对这种情形。
CompositeFileListFilter 可启用组合功能,如下示例所示:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
如果无法使用临时名称创建文件并重命名为最终名称,Spring Integration 提供了另一种替代方案。
版本 4.2 添加了 LastModifiedFileListFilter。
可以通过 age 属性配置此过滤器,以便仅传递早于该值的文件。
默认年龄为 60 秒,但您应选择足够大的年龄值,以避免过早获取文件(例如由于网络故障)。
以下示例展示了如何配置 LastModifiedFileListFilter:
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
从版本 4.3.7 开始,引入了 ChainFileListFilter(作为 CompositeFileListFilter 的扩展),以支持后续过滤器仅查看前一个过滤器结果的场景。
(使用 CompositeFileListFilter 时,所有过滤器都能看到所有文件,但只会传递通过所有过滤器的文件)。
需要新行为的一个示例是 LastModifiedFileListFilter 和 AcceptOnceFileListFilter 的组合,即我们希望在经过一段时间后才接受文件。
使用 CompositeFileListFilter 时,由于 AcceptOnceFileListFilter 在首次遍历时会查看所有文件,因此当其他过滤器处理时它不会再传递该文件。
CompositeFileListFilter 方法适用于将模式过滤器与自定义过滤器结合使用的场景,后者用于查找辅助文件以指示文件传输已完成。
模式过滤器可能仅传递主文件(例如 something.txt),但“完成”过滤器需要检查(例如)something.done 是否存在。
假设我们拥有文件 a.txt、a.done 和 b.txt。
模式过滤器仅传递 a.txt 和 b.txt,而“完成”过滤器会查看所有三个文件并仅传递 a.txt。
复合过滤器的最终结果是仅释放 a.txt。
使用 ChainFileListFilter,如果链中的任何过滤器返回空列表,则不会调用剩余的过滤器。 |
版本 5.0 引入了一个 ExpressionFileListFilter,用于将 SpEL 表达式针对文件执行,并将该文件作为上下文评估根对象。
为此,所有用于文件处理(本地和远程)的 XML 组件,连同现有的 filter 属性,都已提供 filter-expression 选项,如下示例所示:
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
版本 5.0.5 引入了对被拒绝文件感兴趣的 DiscardAwareFileListFilter 实现。
为此,应通过 addDiscardCallback(Consumer<File>) 向此类过滤器实现提供一个回调。
在框架中,此功能由 FileReadingMessageSource.WatchServiceDirectoryScanner 使用,并与 LastModifiedFileListFilter 结合使用。
与常规 DirectoryScanner 不同,WatchService 根据目标文件系统上的事件提供待处理的文件。
在轮询包含这些文件的内部队列时,LastModifiedFileListFilter 可能会丢弃它们,因为它们相对于其配置的 age 太新。
因此,我们失去了该文件以供未来可能的考虑。
丢弃回调钩子使我们能够将该文件保留在内部队列中,以便在后续轮询时针对 age 进行检查。
CompositeFileListFilter 还实现了 DiscardAwareFileListFilter,并向其所有 DiscardAwareFileListFilter 委托对象填充了丢弃回调。
由于 CompositeFileListFilter 会将文件与所有委托进行匹配,因此 discardCallback 可能会针对同一文件被多次调用。 |
从版本 5.1 开始,FileReadingMessageSource 不会检查目录是否存在,也不会创建它,直到其 start() 被调用(通常通过包装 SourcePollingChannelAdapter 实现)。
此前,在引用目录时(例如来自测试或稍后应用权限时),没有简单的方法可以防止操作系统权限错误。
消息头
从版本 5.0 开始,FileReadingMessageSource(除了作为轮询 File 的 payload 之外)会填充以下标头到出站 Message:
-
FileHeaders.FILENAME: 要发送文件的File.getName()。 可用于后续的重命名或复制逻辑。 -
FileHeaders.ORIGINAL_FILE:File对象本身。 通常,当我们丢失原始File对象时,该头部会自动由框架组件(例如 拆分器 或 转换器)填充。 然而,为了与其他任何自定义用例保持一致性和便利性,此头部可用于访问原始文件。 -
FileHeaders.RELATIVE_PATH: 引入一个新标头,用于表示扫描路径中相对于根目录的部分。 当需要在其他位置恢复源目录层次结构时,此标头可能非常有用。 为此,可以配置DefaultFileNameGenerator(参见“生成文件名”)以使用此标头。
目录扫描与轮询
FileReadingMessageSource 不会立即为来自该目录的文件生成消息。它使用内部队列来处理由 scanner 返回的“符合条件的文件”。scanEachPoll 选项用于确保内部队列在每次轮询时都刷新为最新的输入目录内容。默认情况下(scanEachPoll = false),FileReadingMessageSource会在再次扫描目录之前清空其队列。这种默认行为特别有助于减少扫描目录中大量文件。然而,在需要自定义排序的情况下,重要的是要考虑将此标志设置为 true 所产生的影响。文件的处理顺序可能与预期不符。默认情况下,队列中的文件按其自然(path)顺序处理。即使队列中已有文件,扫描新增的文件也会插入到适当的位置,以维持自然顺序。要自定义顺序,FileReadingMessageSource 可以接受一个 Comparator<File> 作为构造函数参数。它被内部(PriorityBlockingQueue)用于根据业务需求重新排序其内容。因此,若要按特定顺序处理文件,您应向 FileReadingMessageSource 提供一个比较器,而不是对由自定义 DirectoryScanner 生成的列表进行排序。
版本 5.0 引入了 RecursiveDirectoryScanner 用于执行文件树遍历。
该实现基于 Files.walk(Path start, int maxDepth, FileVisitOption… options) 功能。
根目录(DirectoryScanner.listFiles(File))参数不包含在结果中。
所有其他子目录的包含与排除均基于目标 FileListFilter 实现。
例如,SimplePatternFileListFilter 默认会过滤掉目录。
有关更多信息,请参阅 AbstractDirectoryAwareFileListFilter 及其实现。
从版本 5.5 开始,Java DSL 的 FileInboundChannelAdapterSpec 提供了一个方便的 recursive(boolean) 选项,允许在目标 FileReadingMessageSource 中使用 RecursiveDirectoryScanner,而不是默认的那个。 |
命名空间支持
通过使用文件特定的命名空间,可以简化文件读取的配置。 为此,请使用以下模板:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
在此命名空间内,您可以将 FileReadingMessageSource 减少并将其封装在一个入站通道适配器中,如下所示:
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
第一个通道适配器示例依赖于默认的 FileListFilter 实现:
-
IgnoreHiddenFileListFilter(不处理隐藏文件) -
AcceptOnceFileListFilter(防止重复)
因此,您也可以省略 prevent-duplicates 和 ignore-hidden 属性,因为它们默认是 true。
|
Spring Integration 4.2 引入了 |
第二个通道适配器示例使用自定义过滤器,第三个使用 filename-pattern 属性添加基于 AntPathMatcher 的过滤器,第四个使用 filename-regex 属性向 FileReadingMessageSource 添加基于正则表达式模式的过滤器。
filename-pattern 和 filename-regex 属性分别与普通的 filter 引用属性互斥。
然而,您可以使用 filter 属性来引用 CompositeFileListFilter 的一个实例,该实例可以组合任意数量的过滤器,包括一个或多个基于模式的过滤器,以满足您的特定需求。
当多个进程从同一目录读取时,您可能希望锁定文件以防止它们被并发拾取。
为此,您可以使用 FileLocker。
存在一个基于 java.nio 的实现,但您也可以自行实现锁定方案。
nio 锁可以通过以下方式注入:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
您可以按如下方式配置自定义锁:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件传入适配器配置了锁时,它负责在允许接收文件之前获取锁。
它不承担解锁文件的责任。
如果您已处理文件但让锁一直挂起,就会导致内存泄漏。
如果这是一个问题,您应该在适当的时候自行调用 FileLocker.unlock(File file)。 |
当过滤和锁定文件不足以满足需求时,您可能需要完全控制文件的列出方式。
要实现此类需求,您可以使用 DirectoryScanner 的实现类。
该扫描器使您能够精确确定每次轮询中列出的文件。
这也是 Spring Integration 内部用于将 FileListFilter 实例和 FileLocker 连接到 FileReadingMessageSource 的接口。
您可以像以下示例所示,将自定义的 DirectoryScanner 注入到 <int-file:inbound-channel-adapter/> 的 scanner 属性中:
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
这样做可以让您完全自由地选择排序、列表和锁定策略。
同样重要的是要理解,过滤器(包括 patterns、regex、prevent-duplicates 等)以及 locker 实例实际上是由 scanner 使用的。
在适配器上设置的任何这些属性随后都会被注入到内部的 scanner 中。
对于外部 scanner 的情况,所有过滤器和锁属性都禁止在 FileReadingMessageSource 上设置。
它们必须(如果需要)在该自定义 DirectoryScanner 上进行指定。
换句话说,如果您将 scanner 注入到 FileReadingMessageSource 中,则应在该 scanner 上提供 filter 和 locker,而不是在 FileReadingMessageSource 上。
默认情况下,DefaultDirectoryScanner 使用 IgnoreHiddenFileListFilter 和 AcceptOnceFileListFilter。
若要阻止其使用,您可以配置自己的过滤器(例如 AcceptAllFileListFilter),甚至将其设置为 null。 |
WatchServiceDirectoryScanner
FileReadingMessageSource.WatchServiceDirectoryScanner在将新文件添加到目录时依赖文件系统事件。
初始化期间,会注册该目录以生成事件。
初始文件列表也是在初始化期间构建的。
遍历目录树时,遇到的任何子目录也会被注册以生成事件。
在第一次轮询时,返回从遍历目录获得的初始文件列表。
在随后的轮询中,返回来自新创建事件的文件。
如果添加了新的子目录,则使用其创建事件来遍历新的子树以查找现有文件,并注册发现的任何新子目录。
当 WatchKey 的内部事件 queue 未被程序以与目录修改事件发生速度相匹配的速度处理时,会出现问题。
如果队列大小被超出,将发出 StandardWatchEventKinds.OVERFLOW 以指示某些文件系统事件可能已丢失。
在这种情况下,将完全重新扫描根目录。
为避免重复,请考虑使用适当的 FileListFilter(例如 AcceptOnceFileListFilter),或在处理完成后删除文件。 |
WatchServiceDirectoryScanner 可以通过 FileReadingMessageSource.use-watch-service 选项启用,该选项与 scanner 选项互斥。
将为提供的 directory 填充一个内部的 FileReadingMessageSource.WatchServiceDirectoryScanner 实例。
此外,现在 WatchService 轮询逻辑可以跟踪 StandardWatchEventKinds.ENTRY_MODIFY 和 StandardWatchEventKinds.ENTRY_DELETE。
如果您需要跟踪现有文件和新建文件的修改,您应该在 FileListFilter 中实现 ENTRY_MODIFY 事件的逻辑。
否则,这些事件中的文件将被以相同的方式处理。
The ResettableFileListFilter 实现会接收 ENTRY_DELETE 事件。
因此,它们的文件会被提供给 remove() 操作使用。
当启用此事件时,诸如 AcceptOnceFileListFilter 之类的过滤器会将这些文件移除。
结果,如果出现同名文件,它将通过过滤器并被作为消息发送。
为此,引入了 watch-events 属性(FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents))。
(WatchEventType 是 FileReadingMessageSource 中的一个公共内部枚举。)
通过这种选项,我们可以对新建文件使用一套下游流程逻辑,而对修改后的文件使用另一套逻辑。
以下示例展示了如何在同一目录中为创建和修改事件配置不同的逻辑:
值得一提的是,ENTRY_DELETE事件涉及被监视目录的子目录的重命名操作。
更具体地说,与先前目录名相关的ENTRY_DELETE事件发生在通知新(已重命名)目录的ENTRY_CREATE事件之前。
在某些操作系统(如Windows)上,必须注册ENTRY_DELETE事件来处理这种情况。
否则,在文件资源管理器中重命名被监视的子目录可能导致该子目录中的新文件无法被检测到。
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
从版本 6.1 开始,FileReadingMessageSource 公开了两个与 WatchService 相关的新选项:
-
watchMaxDepth-Files.walkFileTree(Path root, Set attributes, int maxDepth, FileVisitor visitor)API 的一个参数; -
watchDirPredicate- aPredicate<Path>to test if a directory in the scanned tree should be walked and registered with theWatchServiceand the configured watch event kinds.
限制内存消耗
您可以使用 HeadDirectoryScanner 来限制内存中保留的文件数量。
这在扫描大型目录时非常有用。
通过 XML 配置,可以通过在入站通道适配器上设置 queue-size 属性来启用此功能。
在 4.2 版本之前,此设置与使用任何其他过滤器不兼容。
任何其他过滤器(包括 prevent-duplicates="true")都会覆盖用于限制大小的过滤器。
|
使用 通常,在这种情况下,您不应使用 |
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置来配置出站适配器:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序展示了如何使用 Java DSL 配置出站适配器:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
'尾随'文件
另一个常用的用例是从文件末尾(或尾部)获取“行”,并在添加新行时捕获它们。
提供了两种实现方式。
第一种实现,OSDelegatingFileTailingMessageProducer,使用原生的 tail 命令(在支持该命令的操作系统上)。
在那些平台上,这通常是最高效的实现方式。
对于没有 tail 命令的操作系统,第二种实现,ApacheCommonsFileTailingMessageProducer,则使用 Apache commons-io Tailer 类。
在这两种情况下,文件系统事件(例如文件不可用)以及其他事件均通过正常的 Spring 事件发布机制作为 ApplicationEvent 实例发布。
此类事件的示例包括以下:
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
前例中所示的事件序列可能会发生,例如在文件轮转时。
从版本 5.0 开始,在 idleEventInterval 期间,如果文件中没有数据,则会发出 FileTailingIdleEvent。
以下示例展示了此类事件的外观:
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有支持 tail 命令的平台都提供这些状态消息。 |
从这些端点发出的消息具有以下标头:
-
FileHeaders.ORIGINAL_FILE:File对象 -
FileHeaders.FILENAME: 文件名 (File.getName())
在 5.0 版本之前的版本中,FileHeaders.FILENAME 头包含文件绝对路径的字符串表示。
现在,您可以通过调用原始文件头中的 getAbsolutePath() 来获取该字符串表示。 |
以下示例使用默认选项('-F -n 0',表示从当前末尾开始跟随文件名)创建一个原生适配器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
以下示例创建一个带有'-F -n +0'选项的原生适配器(意味着跟随文件名,输出所有现有行)。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
如果 tail 命令失败(在某些平台上,缺少文件会导致即使指定了 -F,tail 也会失败),该命令将每 10 秒重试一次。
默认情况下,原生适配器会从标准输出捕获内容并将其作为消息发送。
它们也会从标准错误捕获以触发事件。
从版本 4.3.6 开始,您可以通过将 enable-status-reader 设置为 false 来丢弃标准错误事件,如下例所示:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
在以下示例中,IdleEventInterval被设置为5000,这意味着如果五秒钟内没有写入任何行,则每五秒触发一次FileTailingIdleEvent:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
这在需要停止适配器时可能很有用。
以下示例创建一个 Apache commons-io Tailer 适配器,该适配器每两秒检查文件中的新行,并每十秒检查缺失文件是否存在:
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" (1)
reopen="true" (2)
file-delay="10000"/>
| 1 | 该文件是从开头(end="false")开始尾随的,而不是从末尾(这是默认行为)开始。 |
| 2 | 每个块都会重新打开文件(默认行为是保持文件打开)。 |
指定 delay、end 或 reopen 属性会强制使用 Apache commons-io 适配器,并使 native-options 属性不可用。 |
处理不完整数据
在文件传输场景中,一个常见问题是如何确定传输已完成,从而避免读取不完整的文件。
解决此问题的常用技术是:先使用临时名称写入文件,然后原子性地将其重命名为最终名称。
该技术配合一个过滤器(用于防止消费者捕获临时文件),提供了一种稳健的解决方案。
Spring Integration 中负责写入文件(本地或远程)的组件均采用此技术。
默认情况下,它们会在文件名后追加.writing,并在传输完成后移除该后缀。