Apache NiFi 和 Kafka 集成

Akh*_*aby 3 apache-kafka apache-nifi

我不确定这个问题是否已经在某处得到解决,但我在互联网上的任何地方都找不到有用的答案。

我正在尝试将 Apache NiFi 与 Kafka 集成 - 使用 Apache NiFi 从 Kafka 消费数据。在继续之前,以下是我想到的几个问题。

Q-1)我们的用例是 - 从 Kafka 实时读取数据,解析数据,对数据进行一些基本验证,然后将数据推送到 HBase。我知道 Apache NiFi 是进行这种处理的合适人选,但是如果我们正在处理的 JSON 是一个复杂的工作流,那么构建工作流有多容易?我们最初想使用 Java 代码做同样的事情,但后来意识到这可以在 NiFi 中以最少的努力完成。请注意,我们从 Kafka 处理的 80% 的数据是简单的 JSON,但 20% 是复杂的(invovles 数组)

Q-2) 编写 Kafka 消费者时最棘手的部分是正确处理偏移量。Apache NiFi 在从 Kafka 主题中消费时将如何处理偏移量?如果在处理时触发重新平衡,将如何正确提交偏移量?Spring-Kafka 等框架提供了提交偏移量(在某种程度上)的选项,以防在处理过程中触发重新平衡。NiFi 如何处理?

pus*_*har 8

我在生产中的 3 节点 NiFi 集群中部署了许多管道,其中一个类似于您的用例。

Q-1)为您的用例构建管道非常简单和容易。由于您没有提到processingjson 中涉及的任务类型,我假设是通用任务。涉及 JSON 的通用任务可以是模式验证,可以使用ValidateRecordProcessor实现,使用Processor 进行转换,使用JoltTransformRecord提取属性值EvaluateJsonPath,将 json 转换为其他格式,例如使用ConvertJSONToAvro处理器的 avro 等。 Nifi 使您可以灵活地扩展每个阶段/处理器管道独立。例如,如果使用 JoltTransformRecord 进行转换是耗时的,您可以N通过Concurrent TasksScheduling选项卡下进行配置来扩展它以在每个节点中运行并发任务。

Q-2)就ConsumeKafka_2_0处理器而言,偏移管理是通过首先提交 NiFi 处理器会话然后提交 Kafka 偏移来处理的,这意味着我们默认至少有一次保证。当 Kafka 触发给定分区的消费者重新平衡时,处理器会快速提交(处理器会话和 Kafka 偏移量)它所获得的任何内容,并将消费者返回到池中以供重用。

ConsumeKafka_2_0 处理消费者组成员变化或成员订阅变化时的提交偏移量。这可能发生在流程终止、添加新流程实例或旧实例在失败后恢复生机时。还注意在管理上调整订阅主题的分区数量的情况。