Kafka消费者如何从多个分配的分区中消费

DVS*_*DVS 7 java apache-kafka kafka-consumer-api

TL;博士; 我试图了解分配了多个分区的单个使用者如何处理消耗分区的记录.

例如:

  • 在移动到下一个分区之前完全处理单个分区.
  • 每次处理每个分区的一大块可用记录.
  • 从第一个可用分区处理一批N条记录
  • 在循环轮换中处理来自分区的一批N条记录

我找到了partition.assignment.strategy配置Ranged或配置器,RoundRobin但这只能确定如何为消费者分配分区,而不是从分配给它的分区中获取分配的方式.

我开始深入研究KafkaConsumer源代码,而 #poll()引导我进入#pollForFetches() #pixForFetches()然后引导我到fetcher#fetchedRecords()fetcher#sendFetches()

这只是让我尝试一起跟随整个Fetcher课程,也许它只是迟到或者我只是没有深入挖掘,但我无法解决消费者如何处理多个指定分区.

背景

处理由Kafka Streams支持的数据管道.

在此管道中的几个阶段,由于记录由不同的Kafka Streams应用程序处理,因此流将连接到由外部数据源提供的压缩主题,外部数据源提供将在继续下一个处理阶段之前在记录中增加的所需数据.

在此过程中,有几个死信主题,其中记录无法与可能增加记录的外部数据源匹配.这可能是因为数据尚未可用(事件或广告系列尚未投放),或者它是不良数据并且永远不会匹配.

目标是在发布新的增强数据时重新发布死信主题中的记录,以便我们可以匹配死信主题中以前不匹配的记录,以便更新它们并将它们发送到下游以进行其他处理.

记录可能无法在多次尝试中匹配,并且可能在死信主题中有多个副本,因此我们只想重新处理现有记录(在应用程序启动时的最新偏移之前)以及发送到死信主题的记录自上次运行应用程序以来(在先前保存的消费者组偏移之后).

它很好用,因为我的消费者过滤掉了应用程序启动后到达的所有记录,我的生产者通过提交偏移作为发布交易的一部分来管理我的消费者群体抵消.

但是我想确保我最终将从所有分区中消耗,因为我遇到了一个奇怪的边缘情况,其中未匹配的记录被重新处理并落入与死信主题中相同的分区,仅被消费者过滤掉.虽然没有获得新批次的记录,但是还有一些分区还没有被重新处理.

任何帮助了解单个消费者如何处理多个分配的分区将不胜感激.

Mic*_*son 15

您走在正确的轨道上,Fetcher因为大部分逻辑都在那里。

首先,正如Consumer Javadoc 所提到的:

如果为消费者分配了多个分区以从中获取数据,它将尝试同时从所有分区中消费,从而有效地为这些分区提供相同的消费优先级。

可以想象,在实践中,有一些事情需要考虑。

  • 每次消费者尝试获取新记录时,它都会排除已经有记录等待的分区(来自之前的获取)。已经有正在进行的获取请求的分区也被排除在外。

  • 获取记录时,消费者在获取请求中指定fetch.max.bytesmax.partition.fetch.bytes。代理使用它们来分别确定总共和每个分区返回多少数据。这同样适用于所有分区。

使用这 2 种方法,默认情况下,消费者尝试从所有分区公平地消费。如果不是这种情况,更改fetch.max.bytesmax.partition.fetch.bytes通常会有所帮助。

如果您希望某些分区优先于其他分区,则需要使用pause()resume()手动控制消费流。