手动提交偏移量到 kafka 主题的正确方法是什么

Sha*_*eel 5 python python-3.x apache-kafka kafka-python

我有一个消费者脚本,它处理每条消息并手动向主题提交偏移量。

CONSUMER = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers=[KAFKA_SERVER],
    auto_offset_reset="earliest",
    max_poll_records=100,
    enable_auto_commit=False,
    group_id=CONSUMER_GROUP,
    # Use the RoundRobinPartition method
    partition_assignment_strategy=[RoundRobinPartitionAssignor],
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

while True:
    count += 1
    LOGGER.info("--------------Poll {0}---------".format(count))
    for msg in CONSUMER:
        # Process msg.value
        # Commit offset to topic
        tp = TopicPartition(msg.topic, msg.partition)
        offsets = {tp: OffsetAndMetadata(msg.offset, None)}
        CONSUMER.commit(offsets=offsets)
Run Code Online (Sandbox Code Playgroud)

处理每条消息所需的时间 < 1 秒。

我收到此错误错误:

kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
            rebalanced and assigned the partitions to another member.
            This means that the time between subsequent calls to poll()
            was longer than the configured max_poll_interval_ms, which
            typically implies that the poll loop is spending too much
            time message processing. You can address this either by
            increasing the rebalance timeout with max_poll_interval_ms,
            or by reducing the maximum size of batches returned in poll()
            with max_poll_records.


Process finished with exit code 1
Run Code Online (Sandbox Code Playgroud)

期待:

a) 如何修复这个错误?

b) 如何确保我的手动提交工作正常?

c) 提交偏移量的正确方法。

我已经经历了这个,但是对于 Kafka 0.10.0.0 和更高版本的 session.timeout.ms 和 max.poll.interval.ms 之间的差异,以了解我的问题,非常感谢有关调整轮询、会话或心跳时间的任何帮助。

Apache 卡夫卡:2.11-2.1.0 卡夫卡蟒蛇:1.4.4

Sel*_*m G 0

session.timeout.ms消费者的数量应该小于group.max.session.timeout.msKafka 代理上的当前数量。