Chr*_*s X 1 list apache-kafka kafka-producer-api
我有一些列表想通过卡夫卡生产者发送。
listA [1,2,3]
listB ["cat", "dog", "fish"]
Run Code Online (Sandbox Code Playgroud)
生产者以字节形式发送消息,因此我不确定如何正确设置消息,以便在需要引号来发送消息时发送列表。这就是我目前所拥有的。
producer = KafkaProducer(bootstrap_servers='localhost:9092')
for _ in range(1):
print(producer.send('test', b'"worker_id": listA , "worker_name" : listB'))
Run Code Online (Sandbox Code Playgroud)
这个方法只是给我一个语法错误。我也尝试过下面的方法,得到了类似的结果
print(producer.send('test', b("worker_uuid": worker_uuid))
Run Code Online (Sandbox Code Playgroud)
您考虑过 JSON 编码吗?如果您使用 value_serializer 配置 KafkaProducer,如下所示:
KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
Run Code Online (Sandbox Code Playgroud)
那么你应该能够直接发送你的列表,如下所示:
producer.send('test', [1, 2, 3])
producer.send('test', ["cat", "dog", "fish"])
Run Code Online (Sandbox Code Playgroud)
消费者还需要配置为通过 json 进行解码。如果您使用 kafka-python,您可能会执行以下操作:
KafkaConsumer(value_deserializer=lambda v: json.loads(v.decode('utf-8')))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5048 次 |
| 最近记录: |