在将流文件移动到 NiFi 中的下一个处理器之前引入时间延迟

Vas*_*ian 5 apache-nifi

在 NiFi 中,存在从MQTT(ConsumeMQTT)消费并发布到HDFS路径(PutHDFS)的数据流。我需要在将消耗的数据推送到 HDFS 路径之前引入60 分钟的延迟。发现 ControlRate 和 MergeContent 处理器是可能的解决方案但不确定。

引入时间延迟的理想解决方案是什么?

示例:上午 9:00 使用的流文件应在上午 10:00 发布到 HDFS

在此处输入图片说明

And*_*ndy 8

您可以使用ExecuteScript处理器来运行sleep(60*60*1000)循环,但这会不必要地使用系统资源。

相反,我会引入一个RouteOnAttribute处理器,它的输出关系one_hour_elapsedPutHDFS,然后unmatched循环回到自身。该RouteOnAttribute处理器应该具有路由策略设置为路由为属性的名称和动态特性(点击+的属性选项卡右上角的按钮)命名one_hour_elapsed。表达式语言值应为${now():toNumber():gt(${entryDate:toNumber():plus(3600000)})}.

这个表达:

  1. 获取当前时间并将其转换为自纪元 ( now():toNumber())以来的毫秒数
  2. 获取流文件的entryDate属性(当它进入NiFi时)并将其转换为毫秒并添加一小时(entryDate:toNumber():plus(3600000)[ 3600000 == 60*60*1000])
  3. 比较两个数字 ( a:gt(${b}))

如果这实际上不是流程的开始,您可以使用UpdateAttribute处理器在流程的任何点插入任意时间戳并从那里进行计算。

我还建议设置产量持续时间运行计划中的RouteOnAttribute处理器要比平时高得多的,因为你想要这个处理器不断运行,因为它什么也不干。我建议将其设置为 1 或 5 分钟开始,因为您已经引入了一个小时的延迟。

显示时序循环的流程


Fri*_*ing 8

从 nifi 1.10 开始,使用 RetryFlowfile 处理器可以更轻松地完成此操作。使用惩罚持续时间来设置延迟时间:

在此输入图像描述