处理kafka消息需要很长时间

Sta*_*asM 2 apache-kafka kafka-consumer-api kafka-python

我有一个 Python 进程(或者更确切地说,在消费者组中并行运行的一组进程),它根据来自某个主题的 Kafka 消息输入来处理数据。通常每条消息都会很快得到处理,但有时,根据消息的内容,可能需要很长时间(几分钟)。在这种情况下,Kafka代理会断开客户端与组的连接并启动重新平衡。我可以设置session_timeout_ms一个非常大的值,但大约需要 10 分钟以上,这意味着如果客户端挂掉,集群将在 10 分钟内无法正确重新平衡。这似乎是一个坏主意。此外,大多数消息(大约 98%)都很快,因此为 1-2% 的消息支付这样的惩罚似乎很浪费。OTOH,大消息足够频繁,足以导致大量重新平衡并消耗大量性能(因为当组重新平衡时,什么也没有完成,然后“死”客户端再次重新加入并导致另一次重新平衡)。

那么,我想知道是否还有其他方法来处理需要很长时间才能处理的消息?有没有办法手动启动心跳来告诉代理“没关系,我还活着,我只是在处理消息”?我认为 Python 客户端(我使用的kafka-python 1.4.7)应该为我做这件事,但它似乎没有发生。此外,该 API 似乎根本没有单独的“心跳”功能。据我了解,调用poll()实际上会给我下一条消息——而我什至还没有完成当前的消息,并且还会弄乱 Kafka 消费者的迭代器 API,这在 Python 中使用起来相当方便。

如果很重要的话,如果我没记错的话,Kafka 集群是 Confluence,版本 2.3。

sun*_*007 6

在Kafka中,0.10.1+ Kafka轮询和会话心跳是相互解耦的。你可以在这里得到解释

max.poll.interval.ms在超时之前允许消费者实例完成处理的时间意味着如果处理时间超过 max.poll.interval.ms 时间,消费者组将假定其模具从消费者组中移除并调用重新平衡。

增加此值将增加预期轮询之间的间隔,从而使消费者有更多时间来处理从 poll(long) 返回的一批记录。但同时,它也会延迟组重新平衡,因为消费者只会在轮询调用中加入重新平衡。

session.timeout.ms是用于确定消费者是否仍然存在并按定义的时间间隔 (heartbeat.interval.ms) 发送心跳的超时。一般来说,经验法则是 heartbeat.interval.ms 应该是会话超时的 1/3,这样在网络故障的情况下,消费者在会话超时之前最多可以错过 3 次心跳。

  1. session.timeout.ms:较低的值有利于更快地检测故障。

  2. max.poll.interval.ms:较大的值将降低由于处理时间增加而导致失败的风险,但会增加重新平衡时间。

注意:Consumer Group消耗的大量分区和主题也会影响整体重新平衡时间

另一种方法是,如果您确实想摆脱重新平衡,您可以使用分区分配在每个使用者实例上手动分配分区。在这种情况下,每个消费者实例将使用自己分配的分区独立运行。但在这种情况下,您将无法利用重新平衡功能自动分配分区。