消费者陷入重新加入

kyl*_*kyl 6 apache-kafka kafka-consumer-api message-hub

我已经阅读了其他主题,并且通过使用新的组ID解决了这个问题,但是我想了解是什么原因引起的。

我有一个包含16个分区的主题,我已将session.timeout.ms = 30000设置为max.poll.interval.ms = 30000000。

我运行了我的程序,然后按Ctrl + C组合键,因此无法正常关闭。在我猜了16次之后,我陷入了这个重新加入的问题。session.timeout.ms是心跳超时,因此30秒后它应该使我的用户正常运行,而我的分区应该“释放”对吗?还是只听我的max.poll.interval.ms?

编辑:我仍然间歇性地收到此错误,并且发生这种情况时,我必须重新启动所有使用者。即使我的消费者运行良好,然后他们开始陷入重新加入的困境(没有添加/移除消费者),也会发生这种情况。这是一个错误日志,来自当我尝试将新使用者卡在该状态时与它连接之后:

https://pastebin.com/AXJeSHkp

2017-06-29 17:28:16,215 DEBUG [AbstractCoordinator] - [scheduler-1] - Sending JoinGroup ((type: JoinGroupRequest, groupId=ingestion-matching-kafka-consumer-group-dev1, sessionTimeout=30000, rebalanceTimeout=43200000, memberId=, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@b45e5583)) to coordinator kafka04-prod01.messagehub.services.us-south.bluemix.net:9093 (id: 2147483644 rack: null)

2017-06-29 17:37:21,261 DEBUG [NetworkClient] - [scheduler-1] - Node 2147483644 disconnected.
2017-06-29 17:37:21,263 DEBUG [ConsumerNetworkClient] - [scheduler-1] - Cancelled JOIN_GROUP request {api_key=11,api_version=1,correlation_id=19,client_id=ingestion-matching-kafka-consumer-dev1} with correlation id 19 due to node 2147483644 being disconnected
Run Code Online (Sandbox Code Playgroud)

这些是我认为相关的第一个和最后一个消息。这是我设置的相关超时:

session.timeout.ms=30000
max.poll.interval.ms=43200000    
request.timeout.ms=43205000 # the docs said to keep this higher than max.poll.interval.ms
enable.auto.commit=false
Run Code Online (Sandbox Code Playgroud)

我也应该设置heartbeat.interval.ms吗?这是消费者在某些后台线程中自动将心跳发送到代理的时间间隔(我已经阅读了文档,但是由于某种原因我无法完全解决)。

H.Ç*_*Ç.T 8

我知道这是一个很老的问题,但我有类似的问题,最后我明白了这种情况的原因并想分享。

当重新平衡开始时,Kafka 等待组中的所有消费者 poll() 并发送 joinGroup 请求。重新平衡超时等于 max.poll.interval.ms。因此,Kafka 会等待每个消费者的重新平衡超时或过程结束。

在您的情况下,您将 max.poll.interval.ms 设置为 12 小时。唯一合理的理由是你必须有一个漫长的过程。因此,当重新平衡开始时,Kafka 将等到您的流程完成或 12 小时过去。这就是为什么您的消费者似乎被卡住了。


Mic*_*son 6

如果您的客户端未正确断开连接(崩溃或 SIGINT),则服务器需要 session.timeout.ms(在您的情况下为 30 秒)才能将其从组中踢出。在此期间,服务器仍会认为消费者是该组的一部分,因此不会进行任何重新分配。一旦延迟结束,分配的分区将重新分配给其他消费者(如果有)。

如果您使用新的组 ID,这当然不会发生。虽然每次开发时都想使用新组(因为您不必等待),但您会丢失前一组提交的偏移量,这可能不代表您的应用程序在生产中运行时所处的状态。

关于 max.poll.interval.ms,它是消费者逻辑中两次调用 poll() 之间允许的最大延迟。我认为此设置与此问题无关。