如何使用 kafka-console-producer 将 json 文件数据插入到 kafka 主题中?每个json数据集可以存储为一条消息吗?
例子-
{
"id": 1,
"first_name": "John",
"last_name": "Lindt",
"email": "jlindt@gmail.com",
"gender": "Male",
"ip_address": "1.2.3.4"
}
Run Code Online (Sandbox Code Playgroud)
如果您使用此命令 -
cat sampledata.json|kafka-console-producer --broker-list localhost:9092 --topic stream-test-topic
Run Code Online (Sandbox Code Playgroud)
每行都被视为一个单独的消息。
这样做的正确方法是什么?
谢谢!
ps-
弹性搜索正在阅读该主题。示例 json 消息文件 -
[{
"id": 1,
"first_name": "John",
"last_name": "Lindt",
"email": "jlindt@gmail.com",
"gender": "Male",
"ip_address": "1.2.3.4"
}, {
"id": 2,
"first_name": "Peter",
"last_name": "Friz",
"email": "Friz3@gmail.com",
"gender": "Male",
"ip_address": "4.5.6.7"
}, {
"id": 3,
"first_name": "Dell",
"last_name": "Chang",
"email": "Dellc@gmail.com",
"gender": "Female",
"ip_address": "8.9.10.11"
}, {
"id": 4,
"first_name": "Lolita",
"last_name": "John",
"email": "LolitaJ@gmail.com",
"gender": "Female",
"ip_address": "12.13.14.15"
}, {
"id": 5,
"first_name": "Pele",
"last_name": "Wang",
"email": "Pele@gmail.com",
"gender": "Male",
"ip_address": "16.17.18.19"
}, {
"id": 6,
"first_name": "Rene",
"last_name": "Charm",
"email": "Rene3@gmail.com",
"gender": "Male",
"ip_address": "20.21.22.23"
}]
Run Code Online (Sandbox Code Playgroud)
Nis*_*yal 12
如果文件中有 JSON 消息,则可以使用以下方式写入 kafka 主题:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic user-timeline < samplerecords.json
Run Code Online (Sandbox Code Playgroud)
Kafka 生产者使用 default 逐行读取消息LineMessageReader。默认键和值序列化程序是StringSerializer. 它不会验证是否存在正确的 json,而是将原始字符串对象视为发布到 kafka 主题。但是如果你想验证你可以在 console-producer 命令中定义以下配置。
key.serializer
value.serializer
Run Code Online (Sandbox Code Playgroud)
例子 :
kafka-console-producer --broker-list localhost:9092 --topic testTopic--property value.serializer=custom.class.serialization.JsonSerializer
Run Code Online (Sandbox Code Playgroud)
在消费者方面,您可以采用类似的方法。使用 JsonDeserializer 读取数据。
war*_*iak 11
从 Kafka 的角度来看,每条消息都是字节数组。这取决于客户端的应用程序(生产者、消费者等),它如何对待它。Kafka Producer、Consumer 使用 Deserializer、Serializer 将字节数组转换为业务对象(字符串、POJO)
您面临的问题是 Kafka Console 生产者从标准输入读取消息的方式。默认情况下它使用LineMessageReader,它将每一行视为新消息。您可以实现自己的,或者在发送之前将 json 中的每个新行字符转换为其他一些空格。
例如,您可以使用以下命令:
jq -rc . sampledata.json | kafka-console-producer --broker-list localhost:9092 --topic stream-test-topic
小智 5
我也是 Kafka 的新手,并且有和你一样的用例。经过一番研究后,我找到了一个可能对您有帮助的简短答案。你可以写如下内容:
bin/kafka-console-producer --broker-list localhost:9092 --topic blogpost
{"title":"The Matrix","year":1999,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}.
Run Code Online (Sandbox Code Playgroud)
欲了解更详细的信息,请单击此处
| 归档时间: |
|
| 查看次数: |
31339 次 |
| 最近记录: |