在 NiFi 中,存在从MQTT(ConsumeMQTT)消费并发布到HDFS路径(PutHDFS)的数据流。我需要在将消耗的数据推送到 HDFS 路径之前引入60 分钟的延迟。发现 ControlRate 和 MergeContent 处理器是可能的解决方案但不确定。
引入时间延迟的理想解决方案是什么?
示例:上午 9:00 使用的流文件应在上午 10:00 发布到 HDFS
您可以使用ExecuteScript处理器来运行sleep(60*60*1000)循环,但这会不必要地使用系统资源。
相反,我会引入一个RouteOnAttribute处理器,它的输出关系one_hour_elapsed为PutHDFS,然后unmatched循环回到自身。该RouteOnAttribute处理器应该具有路由策略设置为路由为属性的名称和动态特性(点击+的属性选项卡右上角的按钮)命名one_hour_elapsed。表达式语言值应为${now():toNumber():gt(${entryDate:toNumber():plus(3600000)})}.
这个表达:
now():toNumber())以来的毫秒数entryDate属性(当它进入NiFi时)并将其转换为毫秒并添加一小时(entryDate:toNumber():plus(3600000)[ 3600000 == 60*60*1000])a:gt(${b}))如果这实际上不是流程的开始,您可以使用UpdateAttribute处理器在流程的任何点插入任意时间戳并从那里进行计算。
我还建议设置产量持续时间和运行计划中的RouteOnAttribute处理器要比平时高得多的,因为你不想要这个处理器不断运行,因为它什么也不干。我建议将其设置为 1 或 5 分钟开始,因为您已经引入了一个小时的延迟。
| 归档时间: |
|
| 查看次数: |
2302 次 |
| 最近记录: |