小编Akh*_*aby的帖子

Spring-Kafka 并发属性

我正在使用 Spring-Kafka 编写我的第一个 Kafka Consumer。查看了框架提供的不同选项,并且对此几乎没有怀疑。有人可以在下面澄清,如果你已经工作过。

问题 - 1:根据 Spring-Kafka 文档,有两种方法可以实现 Kafka-Consumer;“您可以通过配置 MessageListenerContainer 并提供消息侦听器或使用 @KafkaListener 注释来接收消息”。有人能告诉我什么时候应该选择一个选项而不是另一个选项吗?

问题 - 2:我选择了 KafkaListener 方法来编写我的应用程序。为此,我需要初始化一个容器工厂实例,并且在容器工厂内部有控制并发的选项。只是想仔细检查我对并发的理解是否正确。

假设,我有一个主题名称 MyTopic,其中有 4 个分区。为了使用来自 MyTopic 的消息,我已经启动了我的应用程序的 2 个实例,这些实例是通过将并发设置为 2 来启动的。因此,理想情况下,根据 kafka 分配策略,2 个分区应分配给 consumer1,其他 2 个分区应分配给 consumer2 . 既然并发设置为2,那么每个consumer是否会启动2个线程,并行的从topic中消费数据?如果我们并行消费,我们还应该考虑什么。

问题 3 - 我选择了手动确认模式,而不是在外部管理偏移量(不将其持久化到任何数据库/文件系统)。那么我是否需要编写自定义代码来处理重新平衡,或者框架会自动管理它?我认为没有,因为我只有在处理完所有记录后才承认。

问题 - 4:另外,使用手动 ACK 模式,哪个监听器会提供更好的性能?BATCH 消息侦听器或普通消息侦听器。我想如果我使用普通消息侦听器,则在处理每条消息后将提交偏移量。

粘贴下面的代码供您参考。

批确认消费者

    public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment,
          Consumer<?, ?> consumer) {
      for (ConsumerRecord<String, String> record : records) {
          System.out.println("Record : " + record.value());
          // Process …
Run Code Online (Sandbox Code Playgroud)

spring apache-kafka spring-kafka

13
推荐指数
1
解决办法
1万
查看次数

Apache NiFi 和 Kafka 集成

我不确定这个问题是否已经在某处得到解决,但我在互联网上的任何地方都找不到有用的答案。

我正在尝试将 Apache NiFi 与 Kafka 集成 - 使用 Apache NiFi 从 Kafka 消费数据。在继续之前,以下是我想到的几个问题。

Q-1)我们的用例是 - 从 Kafka 实时读取数据,解析数据,对数据进行一些基本验证,然后将数据推送到 HBase。我知道 Apache NiFi 是进行这种处理的合适人选,但是如果我们正在处理的 JSON 是一个复杂的工作流,那么构建工作流有多容易?我们最初想使用 Java 代码做同样的事情,但后来意识到这可以在 NiFi 中以最少的努力完成。请注意,我们从 Kafka 处理的 80% 的数据是简单的 JSON,但 20% 是复杂的(invovles 数组)

Q-2) 编写 Kafka 消费者时最棘手的部分是正确处理偏移量。Apache NiFi 在从 Kafka 主题中消费时将如何处理偏移量?如果在处理时触发重新平衡,将如何正确提交偏移量?Spring-Kafka 等框架提供了提交偏移量(在某种程度上)的选项,以防在处理过程中触发重新平衡。NiFi 如何处理?

apache-kafka apache-nifi

3
推荐指数
1
解决办法
1240
查看次数

Apache NiFi - “ExecuteSQL”并行运行查询?

Apache NiFi 提供“ExecuteSQL”处理器来执行查询并将结果作为流文件返回。但是,如果我们选择执行选项为“所有节点”,NiFi是否会将查询分成不同的批次并并行执行每个批次(类似于SQOOP的做法)?

apache-nifi

0
推荐指数
1
解决办法
1539
查看次数

标签 统计

apache-kafka ×2

apache-nifi ×2

spring ×1

spring-kafka ×1