调试复杂 NiFi 数据流的理想方式

pra*_*por 2 hortonworks-data-platform apache-nifi hortonworks-dataflow

根据我在使用 NiFi 构建一些数据库摄取 PoC 后的理解,整个数据流作为流文件流运行。在任何特定时间,执行控制可以同时在一个或多个处理器上。

所以我对如何调试复杂的数据流以解决任何故障感到非常困惑。

我的 PoC 工作流程本身如下所示。 nifi数据流

当我们使用生产用例时,它可能会变得比这复杂得多。所以我有几个问题。

  1. 如何知道数据流的状态。如果假设 10 个分叉流文件中有 4 个GenerateTableFetch因数据库池错误而失败,我如何知道哪些文件失败以及如何快速重播它们,而无需逐一查找数据来源。

  2. 有没有一种方法可以通过查看数据流来了解哪个处理器上的哪个流文件发生了故障。

我对使用 NiFi 调试数据流有很多疑问/困惑,如果有人可以给我指出一些文档或分享最佳实践,那将会很有帮助。

谢谢。

Up_*_*One 5

1-如何了解数据流的状态。如果假设 10 个分叉流文件中有 4 个因数据库池错误而在GenerateTableFetch 中失败,那么我如何知道哪些文件失败以及如何快速重播它们,而无需进行数据治理并一一执行。

您可以通过将类型故障关系或任何其他关系(具体取决于您使用的处理器类型)发送到进程组来处理错误来进行管理。

所以就像布莱恩提到的,你不希望它们自动终止,除非你不在乎。

2-有没有一种方法可以通过查看数据流来了解哪个处理器上的哪个流文件出现故障。

是的 - 您必须设置“公告级别”来指定日志级别

如何管理失败的 NiFi 流量?

Well you need to be best friends with the BuletinBoard see here SiteToSiteStatusReportingTask or you can use InvokeHttp against the native NiFI Rest Api with a GET call for http://nifi-server:port/nifi-api/flow/bulletin-board and this will Respond with a detailed json object wich can be parsed and then pushed into a PutSlack/PutEmail/PutSNS for any error.

Also is ideal to have Shared Process Group to handle any incoming Error Flow files, this PG will be build with rules and routes to apply to all data flow logic in you NiFi server. Is critical to have PG specific attributes that will be carried with all your flows and will be used down the course of the data flow.

eg:

Process Group "Demo" has a processor called Set PG Attributes that sets the PGName attibute, PGType attibute, FailEmailTitle attribute ,etc. If my flow failes at any point the failure relation will route my failed flow based on the value of one of the Attributes set in the Set PG Attributes processor

Here is a diagram of my current setup, where i have all the failure sent to the same shared PG. 在此输入图像描述

Other option

If you think the buletin persisting for 5 min only is an issue then you can use the nifi-app.log, which can be set to be populated by the rules in your /opt/nifi/conf/logback.xml file

  <logger name="org.apache.nifi" level="ERROR"/>
    <logger name="org.apache.nifi.processors" level="DEBUG"/>
    <logger name="org.apache.nifi.processors.standard.LogAttribute" level="ERROR"/>
    <logger name="org.apache.nifi.processors.standard.LogMessage" level="ERROR"/>
    <logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="ERROR" />
Run Code Online (Sandbox Code Playgroud)

So you can have tailFile processor that is looking at you local log file and grabs error information or what ever you think is of use to you and makes some sense out of it.