我有一个工作流程,其中我获取 json 文件作为其余 api 的响应。我在一次会话中收到大约 100k 个文件。所有文件的总大小为 15GB。我必须将每个文件保存到文件系统,这是我正在做的。在该过程结束时,我必须等待所有文件都存在,然后才能发送成功消息。
一旦我将文件保存在 FS 中,我就会调用notify+wait。但我不再需要流程文件中的 15 GB 数据。所以为了释放一些空间,我想到了使用replaceText或ModifyByte来清除内容。所以notify+wait运行顺利。此过程总共等待3小时。
但在这两种情况(replaceText 或ModifyByte)中,过程都花费了太长的时间。
您能否建议清除流文件数据的最快方法。我也不需要任何属性。那么我可以放弃旧的流程文件并生成 kb 流程文件吗?
我想要的是类似于generateflowfile的东西,但是在中间,所以对于我现有的每个流程文件,我可以删除旧的流程文件,并生成空白流程文件以进行通知和等待。
谢谢
NiFi 的内容存储库和 FlowFile 存储库基于写入时复制机制,因此如果您不更改内容或元数据,那么您不一定要在这些处理器上“保留”15GB。
话虽如此,如果您需要的只是磁盘上存在此类流文件(而不是内容或元数据),请尝试使用以下 Groovy 脚本执行 ExecuteScript:
def flowFiles = session.get(1000)
flowFiles.each {
session.transfer(session.create(), REL_SUCCESS)
}
session.remove(flowFiles)
Run Code Online (Sandbox Code Playgroud)
该脚本将一次抓取最多 1000 个流文件,并为每个文件向下游发送一个空流文件。然后它会删除所有原始传入流文件。
请注意,这(即您的用例)将“破坏”出处/谱系链,因此如果您的流程中出现问题,您将无法分辨哪些流程文件来自哪个父流程文件等。此限制这是您看不到执行此类功能的完整处理器的原因之一。
| 归档时间: |
|
| 查看次数: |
12470 次 |
| 最近记录: |