Rah*_*hul 9 python json apache-kafka kafka-python
我有一个简单的 JSON 对象,如下所示
d = { 'tag ': 'blah',
'name' : 'sam',
'score':
{'row1': 100,
'row2': 200
}
}
Run Code Online (Sandbox Code Playgroud)
以下是我的 python 代码,它将消息发送到 Kafka
from kafka import SimpleProducer, KafkaClient
import json
# To send messages synchronously
kafka = KafkaClient('10.20.30.12:9092')
producer = SimpleProducer(kafka)
jd = json.dumps(d)
producer.send_messages(b'message1',jd)
Run Code Online (Sandbox Code Playgroud)
我在风暴日志中看到消息正在被接收,但它抛出 Transformation null for tuple { json structure in here } 不确定需要做什么才能解决这个问题?..
Kafka 需要以字节为单位的值
b`some json message`
Run Code Online (Sandbox Code Playgroud)
这是我的简单 Kafka 生产者,它将消息发送到 Kafka 服务器。
import json
from bson import json_util
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
for i in range(10):
data = { 'tag ': 'blah',
'name' : 'sam',
'index' : i,
'score':
{'row1': 100,
'row2': 200
}
}
producer.send('orders', json.dumps(data, default=json_util.default).encode('utf-8'))
Run Code Online (Sandbox Code Playgroud)
这里 json.dumps() 将 json 转换为字符串,encode('utf-8') 将字符串转换为字节数组。
以下是我的生产者到 kafka 的代码。我做的唯一不同的是使用yaml.safe_load加载 json 内容。它将内容作为字符串而不是 unicode 返回。以下是片段
with open('smaller_test_prod.txt') as f:
for line in f:
d = yaml.safe_load(line)
jd = json.dumps(d)
producer.send_messages(b'zeus_metrics',jd)
Run Code Online (Sandbox Code Playgroud)
这里每一行都是存储在文件中的 json 数据。
| 归档时间: |
|
| 查看次数: |
35467 次 |
| 最近记录: |