json文件数据转入kafka主题

Ton*_*ony 9 apache-kafka

如何使用 kafka-console-producer 将 json 文件数据插入到 kafka 主题中?每个json数据集可以存储为一条消息吗?

例子-

{
  "id": 1,
  "first_name": "John",
  "last_name": "Lindt",
  "email": "jlindt@gmail.com",
  "gender": "Male",
  "ip_address": "1.2.3.4"
}
Run Code Online (Sandbox Code Playgroud)

如果您使用此命令 -

cat sampledata.json|kafka-console-producer --broker-list localhost:9092 --topic  stream-test-topic
Run Code Online (Sandbox Code Playgroud)

每行都被视为一个单独的消息。

这样做的正确方法是什么?

谢谢!

ps-

弹性搜索正在阅读该主题。示例 json 消息文件 -

[{
  "id": 1,
  "first_name": "John",
  "last_name": "Lindt",
  "email": "jlindt@gmail.com",
  "gender": "Male",
  "ip_address": "1.2.3.4"
}, {
  "id": 2,
  "first_name": "Peter",
  "last_name": "Friz",
  "email": "Friz3@gmail.com",
  "gender": "Male",
  "ip_address": "4.5.6.7"
}, {
  "id": 3,
  "first_name": "Dell",
  "last_name": "Chang",
  "email": "Dellc@gmail.com",
  "gender": "Female",
  "ip_address": "8.9.10.11"
}, {
  "id": 4,
  "first_name": "Lolita",
  "last_name": "John",
  "email": "LolitaJ@gmail.com",
  "gender": "Female",
  "ip_address": "12.13.14.15"
}, {
  "id": 5,
  "first_name": "Pele",
  "last_name": "Wang",
  "email": "Pele@gmail.com",
  "gender": "Male",
  "ip_address": "16.17.18.19"
}, {
  "id": 6,
  "first_name": "Rene",
  "last_name": "Charm",
  "email": "Rene3@gmail.com",
  "gender": "Male",
  "ip_address": "20.21.22.23"
}]
Run Code Online (Sandbox Code Playgroud)

Nis*_*yal 12

如果文件中有 JSON 消息,则可以使用以下方式写入 kafka 主题:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic user-timeline < samplerecords.json
Run Code Online (Sandbox Code Playgroud)

Kafka 生产者使用 default 逐行读取消息LineMessageReader。默认键和值序列化程序是StringSerializer. 它不会验证是否存在正确的 json,而是将原始字符串对象视为发布到 kafka 主题。但是如果你想验证你可以在 console-producer 命令中定义以下配置。

key.serializer
value.serializer
Run Code Online (Sandbox Code Playgroud)

例子 :

kafka-console-producer --broker-list localhost:9092 --topic testTopic--property value.serializer=custom.class.serialization.JsonSerializer 
Run Code Online (Sandbox Code Playgroud)

在消费者方面,您可以采用类似的方法。使用 JsonDeserializer 读取数据。

  • 我发现这个对我不起作用,尽管我也尝试使用 key.serializer。 (2认同)

war*_*iak 11

从 Kafka 的角度来看,每条消息都是字节数组。这取决于客户端的应用程序(生产者、消费者等),它如何对待它。Kafka Producer、Consumer 使用 Deserializer、Serializer 将字节数组转换为业务对象(字符串、POJO)

您面临的问题是 Kafka Console 生产者从标准输入读取消息的方式。默认情况下它使用LineMessageReader,它将每一行视为新消息。您可以实现自己的,或者在发送之前将 json 中的每个新行字符转换为其他一些空格。

例如,您可以使用以下命令:

jq -rc . sampledata.json | kafka-console-producer --broker-list localhost:9092 --topic stream-test-topic

  • 使用 jq -rc 的简单性非常出色!如果您不需要验证 jason,那么将其放在一行中就足以让 kafka-console-生产者将其写入单个消息。这当然对我有用。 (2认同)
  • 这应该是评价最高的回复,因为它完美地回答了最初的问题:如何将多行 json 传递给 Kafka 生产者。 (2认同)

小智 5

我也是 Kafka 的新手,并且有和你一样的用例。经过一番研究后,我找到了一个可能对您有帮助的简短答案。你可以写如下内容:

bin/kafka-console-producer --broker-list localhost:9092 --topic blogpost
{"title":"The Matrix","year":1999,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}.
Run Code Online (Sandbox Code Playgroud)

欲了解更详细的信息,请单击此处