Kafka 传递重复消息

Pra*_*mar 2 apache-kafka kafka-consumer-api kafka-python

我们正在使用 kafka(0.9.0.0) 来编排不同微服务之间的命令消息。我们发现了一个间歇性问题,即重复的消息被传递到特定主题。下面给出了发生此问题时发生的日志。有人可以帮助理解这个问题吗

Wed, 21-Sep-2016 09:19:07 - WARNING Coordinator unknown during heartbeat -- will retry
Wed, 21-Sep-2016 09:19:07 - WARNING Heartbeat failed; retrying
Wed, 21-Sep-2016 09:19:07 - WARNING <BrokerConnection host=AZSG-D-BOT-DEV4 port=9092> timed out after 40000 ms. Closing connection.
Wed, 21-Sep-2016 09:19:07 - ERROR Fetch to node 1 failed: RequestTimedOutError - 7 - This error is thrown if the request exceeds the user-specified time limit in the request.
Wed, 21-Sep-2016 09:19:07 - INFO Marking the coordinator dead (node 1): None.
Wed, 21-Sep-2016 09:19:07 - INFO Group coordinator for kafka-python-default-group is BrokerMetadata(nodeId=1, host=u'AZSG-D-BOT-DEV4', port=9092)
Wed, 21-Sep-2016 09:19:07 - ERROR OffsetCommit failed for group kafka-python-default-group due to group error (UnknownMemberIdError - 25 - Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.), will rejoin
Wed, 21-Sep-2016 09:19:07 - WARNING Offset commit failed: group membership out of date This is likely to cause duplicate message delivery.
Wed, 21-Sep-2016 09:19:07 - ERROR LeaveGroup request failed: UnknownMemberIdError - 25 - Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.
Wed, 21-Sep-2016 09:19:07 - INFO Marking the coordinator dead (node 1): None.
Wed, 21-Sep-2016 09:19:07 - INFO Group coordinator for kafka-python-default-group is BrokerMetadata(nodeId=1, host=u'AZSG-D-BOT-DEV4', port=9092)
Wed, 21-Sep-2016 09:19:07 - ERROR OffsetCommit failed for group kafka-python-default-group due to group error (UnknownMemberIdError - 25 - Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.), will rejoin
Wed, 21-Sep-2016 09:19:07 - WARNING Offset commit failed: group membership out of date This is likely to cause duplicate message delivery.
Wed, 21-Sep-2016 09:19:10 - INFO Joined group 'kafka-python-default-group' (generation 5) with member_id kafka-python-1.0.2-8585f310-cb4f-493a-a98d-12ec9810419b
Wed, 21-Sep-2016 09:19:10 - INFO Updated partition assignment: [TopicPartition(topic=u'ilinaTestPlatformReq', partition=0)]
Run Code Online (Sandbox Code Playgroud)

Ali*_*der 5

来自消费者配置的 Kafka 文档

session.timeout.ms(默认 30000) - 使用 Kafka 的组管理工具时用于检测故障的超时。当会话超时内未收到消费者的心跳时,代理会将消费者标记为失败并重新平衡组。由于心跳仅在poll()被调用时发送,更高的会话超时允许更多的时间在消费者的轮询循环中进行消息处理,但代价是检测硬故障的时间更长。另请参阅 max.poll.records控制轮询循环中处理时间的另一个选项。请注意,该值必须在代理配置中由group.min.session.timeout.ms 和配置的允许范围内group.max.session.timeout.ms

好像消息处理时间大于30000ms会触发consumer rebalancing,可能会导致重复的消息传递。

你可以尝试的是增加session.timeout.ms.

另一种选择是在处理消息之前使用pause() 并在处理消息之后使用resume() 时异步处理消息。在这种情况下,poll()即使处理时间超过,消费者也会调用(并发送心跳)session.timeout.ms。因此,经纪人不会将您的消费者标记为失败,也不会启动重新平衡。