Kafka Consumer 使用 python 轮询消息

soa*_*soa 5 python consumer apache-kafka kafka-python

我在消费者组中轮询来自 Kafka 的消息时遇到问题。\n我的消费者对象分配给给定的分区

\n\n
self.ps = TopicPartition(topic, partition )\n
Run Code Online (Sandbox Code Playgroud)\n\n

之后消费者分配给该分区:

\n\n
self.consumer.assign([self.ps])\n
Run Code Online (Sandbox Code Playgroud)\n\n

之后我就可以计算分区内的消息了

\n\n
self.consumer.seek_to_beginning(self.ps)\npos = self.consumer.position(self.ps)\n
Run Code Online (Sandbox Code Playgroud)\n\n

self.consumer.seek_to_end(self.ps)\n......

\n\n

我的主题中有超过 30000 条消息。\n问题是我只收到一条消息。

\n\n

消费者配置为:\n max_poll_records= 200\nAUTO_OFFSET_RESET是最早的

\n\n

这是我的功能,我试图获取消息:

\n\n
 def poll_messages(self):\n\n\n    data = []\n\n    messages = self.consumer.poll(timeout_ms=6000)\n\n\n    for partition, msgs in six.iteritems(messages):\n\n        for msg in msgs:\n\n            data.append(msg)\n\n    return data\n
Run Code Online (Sandbox Code Playgroud)\n\n

即使我在开始轮询消息之前转到第一个可用偏移量\n我也只收到一条消息。

\n\n
self.consumer.seek(self.ps, self.get_first_offset())\n
Run Code Online (Sandbox Code Playgroud)\n\n

我希望有人能解释我做错了什么。\n提前致谢。

\n\n

最美好的祝愿\nJ\xc3\xb6rn

\n

Nic*_*ick 0

我相信您误解了 max_poll_records - 这并不意味着您每次民意调查都会得到 200 个记录,而只是对您可能获得的最多记录的限制。您需要多次调用 poll。我建议您参阅文档以获取简单的示例:http://kafka-python.readthedocs.io/en/master/usage.html

我相信更标准的实现是:

for message in self.consumer:
  # do stuff like:
  print(msg)
Run Code Online (Sandbox Code Playgroud)

  • 发生的情况是,如果队列中没有消息(没有任何可读取的内容),则 for 循环不会移动。这本身并不是问题,但它会降低您的灵活性。上面的“soa”代码使用轮询,它将在队列中等待几秒钟,然后执行其他操作。我认为“soa”正在寻找轮询解决方案。 (4认同)
  • 不幸的是尼克,我相信你的例子是一个阻塞调用 (3认同)