使用 python 将列表发送到 Kafka 生产者

Chr*_*s X 1 list apache-kafka kafka-producer-api

我有一些列表想通过卡夫卡生产者发送。

listA [1,2,3]
listB ["cat", "dog", "fish"]
Run Code Online (Sandbox Code Playgroud)

生产者以字节形式发送消息,因此我不确定如何正确设置消息,以便在需要引号来发送消息时发送列表。这就是我目前所拥有的。

producer = KafkaProducer(bootstrap_servers='localhost:9092')
for _ in range(1):
        print(producer.send('test', b'"worker_id": listA ,  "worker_name" : listB'))
Run Code Online (Sandbox Code Playgroud)

这个方法只是给我一个语法错误。我也尝试过下面的方法,得到了类似的结果

print(producer.send('test', b("worker_uuid": worker_uuid))
Run Code Online (Sandbox Code Playgroud)

dpk*_*pkp 7

您考虑过 JSON 编码吗?如果您使用 value_serializer 配置 KafkaProducer,如下所示:

KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
Run Code Online (Sandbox Code Playgroud)

那么你应该能够直接发送你的列表,如下所示:

producer.send('test', [1, 2, 3])
producer.send('test', ["cat", "dog", "fish"])
Run Code Online (Sandbox Code Playgroud)

消费者还需要配置为通过 json 进行解码。如果您使用 kafka-python,您可能会执行以下操作:

KafkaConsumer(value_deserializer=lambda v: json.loads(v.decode('utf-8')))
Run Code Online (Sandbox Code Playgroud)