Sha*_*eel 5 python python-3.x apache-kafka kafka-python
我有一个消费者脚本,它处理每条消息并手动向主题提交偏移量。
CONSUMER = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=[KAFKA_SERVER],
auto_offset_reset="earliest",
max_poll_records=100,
enable_auto_commit=False,
group_id=CONSUMER_GROUP,
# Use the RoundRobinPartition method
partition_assignment_strategy=[RoundRobinPartitionAssignor],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
while True:
count += 1
LOGGER.info("--------------Poll {0}---------".format(count))
for msg in CONSUMER:
# Process msg.value
# Commit offset to topic
tp = TopicPartition(msg.topic, msg.partition)
offsets = {tp: OffsetAndMetadata(msg.offset, None)}
CONSUMER.commit(offsets=offsets)
Run Code Online (Sandbox Code Playgroud)
处理每条消息所需的时间 < 1 秒。
我收到此错误错误:
kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll()
was longer than the configured max_poll_interval_ms, which
typically implies that the poll loop is spending too much
time message processing. You can address this either by
increasing the rebalance timeout with max_poll_interval_ms,
or by reducing the maximum size of batches returned in poll()
with max_poll_records.
Process finished with exit code 1
Run Code Online (Sandbox Code Playgroud)
期待:
a) 如何修复这个错误?
b) 如何确保我的手动提交工作正常?
c) 提交偏移量的正确方法。
我已经经历了这个,但是对于 Kafka 0.10.0.0 和更高版本的 session.timeout.ms 和 max.poll.interval.ms 之间的差异,以了解我的问题,非常感谢有关调整轮询、会话或心跳时间的任何帮助。
Apache 卡夫卡:2.11-2.1.0 卡夫卡蟒蛇:1.4.4
| 归档时间: |
|
| 查看次数: |
3438 次 |
| 最近记录: |