是否有用于事件驱动的 Kafka 消费者的 Python API?

Sha*_*ank 5 python events listener flask apache-kafka

我一直在尝试构建一个以 Kafka 作为唯一界面的 Flask 应用程序。出于这个原因,我想要一个 Kafka 消费者,当相关主题的流中有新消息时触发它,并通过将消息推回 Kafka 流来响应。

我一直在寻找类似 Spring 实现的东西:

@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message) {
    System.out.println("Received Messasge in group mygroup: " + message);
}
Run Code Online (Sandbox Code Playgroud)

我看过:

  1. 卡夫卡蟒蛇
  2. 卡夫卡
  3. 融合卡夫卡

但是我在 Python 中找不到与事件驱动的实现风格相关的任何内容。

Sha*_*ank 9

这是@MickaelMaison's answer给出的想法的实现。我使用了kafka-python

from kafka import KafkaConsumer
import threading

BOOTSTRAP_SERVERS = ['localhost:9092']

def register_kafka_listener(topic, listener):
# Poll kafka
    def poll():
        # Initialize consumer Instance
        consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)

        print("About to start polling for topic:", topic)
        consumer.poll(timeout_ms=6000)
        print("Started Polling for topic:", topic)
        for msg in consumer:
            print("Entered the loop\nKey: ",msg.key," Value:", msg.value)
            kafka_listener(msg)
    print("About to register listener to topic:", topic)
    t1 = threading.Thread(target=poll)
    t1.start()
    print("started a background thread")

def kafka_listener(data):
    print("Image Ratings:\n", data.value.decode("utf-8"))

register_kafka_listener('topic1', kafka_listener)
Run Code Online (Sandbox Code Playgroud)

轮询是在不同的线程中完成的。收到消息后,通过传递从 Kafka 检索到的数据来调用侦听器。


Mic*_*son 3

Kafka Consumer 必须不断轮询才能从代理检索数据。

Spring 为您提供了这个奇特的 API,但在幕后,它在循环中调用 poll,并且仅在检索记录时调用您的方法。

您可以使用您提到的任何 Python 客户端轻松构建类似的东西。与 Java 一样,这不是(大多数)Kafka 客户端直接公开的 API,而是由顶层提供的 API。这是需要构建的东西。