减少 Kafka Consumer 配置中 max.poll.records 的影响

Pra*_*nks 6 apache-kafka spring-kafka

我正在编写一个消费者应用程序,从 kafka 流中选取记录并使用 spring-kafka 对其进行处理。我的处理步骤如下:

Getting records from stream --> dump it into a table --> Fetch records and call API --> API will update records into a table --> calling Async Commit()
Run Code Online (Sandbox Code Playgroud)

在某些情况下,API 处理似乎需要更多时间,因为正在获取更多记录,并且我们收到以下错误?

由于消费者轮询超时已过期,成员 Consumer-prov-em-1-399ede46-9e12-4388-b5b8-f198a4e6a5bc 向协调器 apslt2555.uhc.com:9095(id:2147483577 机架:null)发送 LeaveGroup 请求。这意味着对 poll() 的后续调用之间的时间比配置的 max.poll.interval.ms 长,这通常意味着轮询循环花费了太多时间处理消息。您可以通过增加 max.poll.interval.ms 或通过使用 max.poll.records 减少 poll() 中返回的批次的最大大小来解决此问题。

org.apache.kafka.clients.consumer.CommitFailedException:提交无法完成,因为组已经重新平衡并将分区分配给另一个成员。这意味着后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长,这通常意味着 poll 循环花费了太多时间处理消息。您可以通过增加 max.poll.interval.ms 或通过使用 max.poll.records 减少 poll() 中返回的批次的最大大小来解决此问题。

我知道这可以通过减少 max.poll.records 或增加 max.poll.interval.ms 来解决。我想了解的是,如果我将 max.poll.records 设置为 10 那么 poll() 行为会是什么?是否需要从流中获取 10 条记录等待这些记录被提交,然后再获取下 10 条记录?当下一次轮询发生时?它是否也会影响性能,因为我们将 max.poll.records 从默认的 500 减少到 10。

我还必须增加 max.poll.interval.ms 吗?大概要10分钟吧。更改这些值时我应该注意哪些负面影响?除了这些参数之外,还有其他方法来处理这些错误吗?

Avi*_*rya 10

max.poll.records允许批处理消耗模型,其中记录在将记录刷新到另一个系统之前收集在内存中。这个想法是通过从kafka一起轮询来获取所有记录,然后在轮询循环中在内存中处理它。

如果减少数量,那么消费者将从 kafka 进行轮询的频率会更高。这意味着它需要更频繁地进行网络调用。这可能会降低 kafka 流处理的性能。

max.poll.interval.ms控制消费者主动离开组之前轮询调用之间的最长时间。如果这个数字增加,那么 kafka 将需要更长的时间来检测消费者故障。另一方面,如果这个值太低,kafka 可能会错误地将许多活着的消费者检测为失败,从而更频繁地重新平衡。


归档时间:

查看次数:

8954 次

最近记录:

5 年 前