如何在融合的kafka python中读取批处理消息?

sha*_*eel 2 python apache-kafka confluent-kafka

我正在尝试阅读来自Kafka的消息,因此我编写了简单的消费者来阅读来自Kafka的消息.

While True:
        message = consumer.poll(timeout=1.0)
        # i am doing something with messages
Run Code Online (Sandbox Code Playgroud)

在上面的代码输出消息类型是消息对象.我如何获得一组消息?

有可能吗?

注意:基本的消费者配置不多.

Tre*_*iac 5

librdkafka(底层C库)只将消息逐个返回给应用程序,但在内部,消息是由代理从批处理中提取的,因此没有性能下降.消息在内部缓冲区中排队,等待您的应用轮询.

有调整行为的配置:

fetch.wait.max.ms(默认为100),给予代理累积数据发送的时间 fetch.message.max.bytes(默认为1048576,1GB),批量的最大大小 queued.max.messages.kbytes(默认为1000000),内部队列中的最大数据大小.如果您不定期轮询,则不会从队列中清除数据,您将无法再获取任何数据.

您可以在此处找到许多其他内容:https://github.com/edenhill/librdkafka/blob/0.11.0.x/CONFIGURATION.md


如果你真的想要一个数据数组,因为你处理数据的方式,你可以做的是像你一样在循环中使用低超时调用poll,并在你有x消息时或在y ms之后停止循环,将它们累积在一个集合.处理生成的数组并重复循环.

生产也是如此:您逐个生成数据,但是在发送给经纪人之前对消息进行批处理.

  • 以前就是这种情况,但经过基准测试,由于分配的完成方式,一次返回一条消息与返回一批消息没有任何缺点(在 C 中)。您可以在循环中使用 poll(0) 来创建批处理 - 我不太了解 python,但也许 GitHub 上有一个问题(或者您可以讨论这个问题),这比堆栈溢出更适合这个问题 (2认同)