我有一个场景从我的 Kafka 主题中读取 JSON 数据,通过使用 Kafka 0.11 版本,我需要编写 Java 代码来流式传输 Kafka 主题中存在的 JSON 数据。我的输入是一个包含字典数组的 Json 数据。
现在我的要求是从 json 数据中获取“文本”字段,键入包含在数组中的字典,并通过 Kafka Streaming 将所有这些文本推文传递给另一个主题。
我写代码到这里。请帮我解析数据。
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStreamBuilder builder = new KStreamBuilder();
KStream<String, JsonNode> personstwitter =builder.stream(Serdes.String(), jsonSerde, "Persons");//taking the json node as input
personstwitter.to(Serdes.String(), jsonSerde,"Persons-output");
Run Code Online (Sandbox Code Playgroud)