如何在没有自动提交的情况下长时间(4-60 分钟)处理 Kafka 消息,并在不进行重新平衡的情况下提交它

Rub*_*uck 4 python-3.x apache-kafka

如何在不自动提交的情况下消费 Kafka 消息,长时间处理它(4-60 分钟),并在不进行重新平衡、分区重新分配或阻止其他组消费者消费其他消息的情况下提交它。

\n\n

I\xe2\x80\x99m 使用 Python 3.8 Kafka 消费者,以:

\n\n
    \n
  • 一次使用一条消息,不自动提交。
  • \n
  • 启动一个长时间运行的脚本(在Python中读取\xe2\x80\x99s标准输出)
  • \n
  • 有条件地提交消息。
  • \n
\n\n

我的问题是,卡夫卡分区经常会重新平衡到另一个消费者组成员。

\n\n

在仔细阅读文档后,我尝试并使用了这些配置属性:

\n\n
    \n
  • 会话超时毫秒
  • \n
  • 请求超时毫秒
  • \n
  • 最大轮询间隔时间

    \n\n
    from kafka import KafkaConsumer, OffsetAndMetadata, TopicPartition\n\ndef consume_one_message_at_a_time(conf):\n\nconf.models_dir = f\'{conf.project_root}/{conf.models_dir}\'\ngroup_id = conf.group_id\ngroup_conf = conf.consumer_groups[group_id]\n\nkafka_brokers = conf.KAFKA_BROKERS\ntopic = group_conf.subscribe[0].name\n\nprint(f\'KAFKA_BROKERS: {kafka_brokers}\\n Topic {topic}\\n group id: {group_id}\')\n\nconsumer = KafkaConsumer(\n    topic,\n    bootstrap_servers=kafka_brokers,\n    group_id=group_id,\n    enable_auto_commit=False,\n    max_poll_records=1,\n    max_poll_interval_ms=1800000,\n    # session_timeout_ms=1800000,\n    # request_timeout_ms=1800002,\n    # connections_max_idle_ms=1800003\n    # heartbeat_interval_ms=1800000,\n)\n\nprint(f\'bootstrap_servers: {kafka_brokers} subscribing to {topic}\')\nconsumer.subscribe([topic])\n\nfor message in consumer:\n    print(f"message is of type: {type(message)}")\n\n    if not group_conf.use_cmd:\n        do_something_time_consuming(message)\n    else:\n        if group_id == \'bots\' and check_bot_id(message):\n\n            bot_action(conf, group_conf, message)\n        else:\n            print(f\'no action for group_id: {group_id}\')\n            print(f\'key  : {message.key}\')\n            print(f\'value: {message.value}\')\n\n    meta = consumer.partitions_for_topic(message.topic)\n\n    partition = TopicPartition(message.topic, message.partition)\n    offsets = OffsetAndMetadata(message.offset + 1, meta)\n    options = {partition: offsets}\n\n    print(f\'\\noptions: {options}\\n\')\n\n    response = consumer.commit(offsets=options)\n
    Run Code Online (Sandbox Code Playgroud)
  • \n
\n\n

当其他组成员订阅或完成其工作并消费时,我收到此错误:

\n\n
    Traceback (most recent call last):\n  File "./consumer_one_at_a_time.py", line 148, in <module>\n    consume_one_message_at_a_time(_conf)\n  File "./consumer_one_at_a_time.py", line 141, in consume_one_message_at_a_time\n    response = consumer.commit(offsets=options)\n  File "/usr/lib/python3.8/site-packages/kafka/consumer/group.py", line 526, in commit\n    self._coordinator.commit_offsets_sync(offsets)\n  File "/usr/lib/python3.8/site-packages/kafka/coordinator/consumer.py", line 518, in commit_offsets_sync\n    raise future.exception # pylint: disable-msg=raising-bad-type\nkafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already\n            rebalanced and assigned the partitions to another member.\n            This means that the time between subsequent calls to poll()\n            was longer than the configured max_poll_interval_ms, which\n            typically implies that the poll loop is spending too much\n            time message processing. You can address this either by\n            increasing the rebalance timeout with max_poll_interval_ms,\n            or by reducing the maximum size of batches returned in poll()\n            with max_poll_records.\n
Run Code Online (Sandbox Code Playgroud)\n\n

添加这些配置后我发现新的消费者被阻止了!即,在提交消息之前不要使用消息!

\n\n
session_timeout_ms=1800000,\nrequest_timeout_ms=1800002,\nconnections_max_idle_ms=1800003\n# heartbeat_interval_ms=1800000,\n
Run Code Online (Sandbox Code Playgroud)\n\n

我读到后台线程应该发送心跳。有没有一种方法可以在不轮询的情况下发送心跳?

\n

H.Ç*_*Ç.T 6

有没有一种方法可以在不轮询的情况下发送心跳?

它已经像这样工作了。从 0.10.1.0 版本开始,心跳是通过 Kafka 中的单独线程发送的。(您可以查看以获取更多信息)

一般来说,重新平衡发生在以下情况:

  • 新消费者加入消费组
  • 添加新分区
  • 彻底关闭消费者
  • Kafka 认为消费者已死亡
    • session.timeout.ms 过期而不发送心跳
    • max.poll.timeout.ms 到期但未发送轮询请求

看来你的情况是最后一种。max.poll.interval.ms您轮询记录,但由于进程运行时间较长,因此不会在(您的情况为 30 分钟)内再次轮询。为了解决这个问题:

  • 你可以增加max.poll.interval.ms. 但这可能会导致重新平衡时间过长。因为rebalance.timeout = max.poll.interval.ms。重新平衡启动后,消费者组中的所有消费者都被撤销,并且 Kafka 等待所有仍在向 poll() 发送心跳的消费者(通过轮询消费者在此时发送 joinGroupRequest),直到重新平衡超时到期(等于 )max.poll.interval.ms。假设您设置max.poll.interval.ms为 60 分钟,而您的过程需要 50 分钟才能完成。如果由于我上面提到的任何原因在您的漫长过程的第五分钟开始重新平衡,那么 Kafka 将等待您的消费者轮询 45 分钟。在此期间所有消费者将被撤销。(这个消费群体的消费将完全停止)所以恕我直言,这不是一个好主意。(当然这取决于你的需求)
  • 另一个解决方案是不使用 Kafka 进行此类长时间运行的操作。因为Kafka不适合长时间运行的处理。您可以将有关长流程的元数据保留为 Kafka 消息消费的一部分,然后在不使用 Kafka 的情况下进行适当的操作。