Dol*_*hin 4 python apache-kafka
当我使用 python-kafka(1.4.4) 库来消费消息(Kafka 版本 1.1.0,Python 3.7)时。它一次又一次地抛出此错误。而且我不知道哪里出了问题,这是我的 Python代码,消费者首字母:
consumer = KafkaConsumer('dolphin-spider-google-book-bookinfo',
bootstrap_servers=['mq-server:9092'],
group_id = "google-book",
client_id = "dolphin-pipline-google-bookinfo-consumer-foolman",
# Manage kafka offsets manual
enable_auto_commit = False,
consumer_timeout_ms=50000,
# consume from beginning
auto_offset_reset = "earliest",
max_poll_interval_ms =350000,
session_timeout_ms = 60000,
request_timeout_ms = 700000
)
Run Code Online (Sandbox Code Playgroud)
这是消费逻辑:
def consume_bookinfo(self):
while True:
try:
for books in self.consumer:
logger.info("Get books info offset: %s" ,books.offset)
self.sub_process_handle(books.value,books.offset)
except Exception as e:
logger.error(e)
def sub_process_handle(self,bookinfo,offset):
number_of_threadings = len(threading.enumerate())
if(number_of_threadings < 13):
t = threading.Thread(target=self.background_process,name="offset-" + str(offset), args=(bookinfo,), kwargs={})
t.start()
else:
# If all threading running
# Using main thread to handle
# Slow down kafka consume speed
logger.info("Reach max handle thread,sleep 20s to wait thread release...")
time.sleep(20)
self.sub_process_handle(bookinfo,offset)
def background_process(self,bookinfo):
self.parse_bookinfo(bookinfo)
self.consumer.commit_async(callback=self.offset_commit_result)
Run Code Online (Sandbox Code Playgroud)
我启动多线程来处理消费逻辑。但是运行一段时间后,抛出此错误:
2019-01-30 02:46:52,948 - /home/dolphin/source/dolphin-pipline/dolphin/biz/spider_bookinfo_consumer.py[line:37] - INFO: Get books info offset: 9304
2019-01-30 02:46:52,948 - /home/dolphin/source/dolphin-pipline/dolphin/biz/spider_bookinfo_consumer.py[line:51] - INFO: Reach max handle thread,sleep 20s to wait thread release...
2019-01-30 02:47:12,968 - /home/dolphin/source/dolphin-pipline/dolphin/biz/spider_bookinfo_consumer.py[line:61] - INFO: commit offset success,offsets: {TopicPartition(topic='dolphin-spider-google-book-bookinfo', partition=0): OffsetAndMetadata(offset=9305, metadata='')}
2019-01-30 04:27:47,322 - /usr/local/lib/python3.5/site-packages/kafka/coordinator/base.py[line:964] - WARNING: Heartbeat session expired, marking coordinator dead
2019-01-30 04:27:47,323 - /usr/local/lib/python3.5/site-packages/kafka/coordinator/base.py[line:698] - WARNING: Marking the coordinator dead (node 0) for group google-book: Heartbeat session expired.
2019-01-30 04:27:47,433 - /usr/local/lib/python3.5/site-packages/kafka/cluster.py[line:353] - INFO: Group coordinator for google-book is BrokerMetadata(nodeId=0, host='35.229.69.193', port=9092, rack=None)
2019-01-30 04:27:47,433 - /usr/local/lib/python3.5/site-packages/kafka/coordinator/base.py[line:676] - INFO: Discovered coordinator 0 for group google-book
2019-01-30 04:27:47,433 - /usr/local/lib/python3.5/site-packages/kafka/coordinator/consumer.py[line:341] - INFO: Revoking previously assigned partitions {TopicPartition(topic='dolphin-spider-google-book-bookinfo', partition=0)} for group google-book
2019-01-30 04:27:47,433 - /usr/local/lib/python3.5/site-packages/kafka/coordinator/base.py[line:434] - INFO: (Re-)joining group google-book
2019-01-30 04:27:47,437 - /usr/local/lib/python3.5/site-packages/kafka/coordinator/base.py[line:504] - INFO: Elected group leader -- performing partition assignments using range
2019-01-30 04:27:47,439 - /usr/local/lib/python3.5/site-packages/kafka/coordinator/base.py[line:333] - INFO: Successfully joined group google-book with generation 470
2019-01-30 04:27:47,439 - /usr/local/lib/python3.5/site-packages/kafka/consumer/subscription_state.py[line:257] - INFO: Updated partition assignment: [TopicPartition(topic='dolphin-spider-google-book-bookinfo', partition=0)]
2019-01-30 04:27:47,439 - /usr/local/lib/python3.5/site-packages/kafka/coordinator/consumer.py[line:238] - INFO: Setting newly assigned partitions {TopicPartition(topic='dolphin-spider-google-book-bookinfo', partition=0)} for group google-book
2019-01-30 04:27:47,694 - /home/dolphin/source/dolphin-pipline/dolphin/biz/spider_bookinfo_consumer.py[line:63] - ERROR: commit offset failed,detail: CommitFailedError: Commit cannot be completed since the group has already
rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll()
was longer than the configured max_poll_interval_ms, which
typically implies that the poll loop is spending too much
time message processing. You can address this either by
increasing the rebalance timeout with max_poll_interval_ms,
or by reducing the maximum size of batches returned in poll()
with max_poll_records.
2019-01-30 04:27:47,694 - /home/dolphin/source/dolphin-pipline/dolphin/biz/spider_bookinfo_consumer.py[line:63] - ERROR: commit offset failed,detail: CommitFailedError: Commit cannot be completed since the group has already
rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll()
was longer than the configured max_poll_interval_ms, which
typically implies that the poll loop is spending too much
time message processing. You can address this either by
increasing the rebalance timeout with max_poll_interval_ms,
or by reducing the maximum size of batches returned in poll()
with max_poll_records.
Run Code Online (Sandbox Code Playgroud)
如何避免这个问题?我该怎么办?
我们首先来看看导致此错误的可能原因是什么。正如官方 kafka 消费者文档(此处)中所讨论的那样,kafka 在调用时检测已连接的消费者poll()
订阅一组主题后,消费者将在调用 poll(Duration) 时自动加入该组。民意调查 API 旨在确保消费者的活跃度。只要继续调用 poll,消费者就会留在组中,并继续从分配给它的分区接收消息。在幕后,消费者会定期向服务器发送心跳。如果消费者崩溃或在 session.timeout.ms 时间内无法发送心跳,则消费者将被视为死亡,其分区将被重新分配。
所以要留在组里,就必须继续调用poll。该max.poll.interval.ms设置声明消费者可以在不调用的情况下留在组中多长时间poll()。每次调用都会poll()返回许多记录(默认为 500),然后在for message in consumer. poll()当您处理完所有返回的记录后,将进行下一次调用。
如果你的程序处理记录的时间太长max.poll.interval.ms,你就会被踢出组。
您可以执行以下操作:
max.poll.interval.msmax.poll.records在@Dolphin 的回答中,他实际上正在减少max.poll.records到 1。我更喜欢这样做:
self.consumer = kafka.KafkaConsumer(topic, bootstrap_servers='servers:ports',
group_id='group_id',max_poll_records=1,max_poll_interval_ms=300000)
Run Code Online (Sandbox Code Playgroud)
重要的部分是max_poll_records=1。当然,您可能希望将其设置为大于 1 的值。