删除 nifi flowfile 内容的最快方法是什么?

Rak*_*sad 5 apache-nifi

我有一个工作流程,其中我获取 json 文件作为其余 api 的响应。我在一次会话中收到大约 100k 个文件。所有文件的总大小为 15GB。我必须将每个文件保存到文件系统,这是我正在做的。在该过程结束时,我必须等待所有文件都存在,然后才能发送成功消息。

一旦我将文件保存在 FS 中,我就会调用notify+wait。但我不再需要流程文件中的 15 GB 数据。所以为了释放一些空间,我想到了使用replaceText或ModifyByte来清除内容。所以notify+wait运行顺利。此过程总共等待3小时。

但在这两种情况(replaceText 或ModifyByte)中,过程都花费了太长的时间。

您能否建议清除流文件数据的最快方法。我也不需要任何属性。那么我可以放弃旧的流程文件并生成 kb 流程文件吗?

我想要的是类似于generateflowfile的东西,但是在中间,所以对于我现有的每个流程文件,我可以删除旧的流程文件,并生成空白流程文件以进行通知和等待。

谢谢

mat*_*tyb 6

NiFi 的内容存储库和 FlowFile 存储库基于写入时复制机制,因此如果您不更改内容或元数据,那么您不一定要在这些处理器上“保留”15GB。

话虽如此,如果您需要的只是磁盘上存在此类流文件(而不是内容或元数据),请尝试使用以下 Groovy 脚本执行 ExecuteScript:

def flowFiles = session.get(1000)
flowFiles.each {
   session.transfer(session.create(), REL_SUCCESS)
}
session.remove(flowFiles)
Run Code Online (Sandbox Code Playgroud)

该脚本将一次抓取最多 1000 个流文件,并为每个文件向下游发送一个空流文件。然后它会删除所有原始传入流文件。

请注意,这(即您的用例)将“破坏”出处/谱系链,因此如果您的流程中出现问题,您将无法分辨哪些流程文件来自哪个父流程文件等。此限制这是您看不到执行此类功能的完整处理器的原因之一。

  • 您还可以执行“session.create(it)”,它不会丢失沿袭(并且它还保留与传入流文件相同的属性) (2认同)