使用 Apache Kafka Streaming 解析 JSON 数据

Mou*_*uni 3 parsing jsonschema apache-kafka-streams

我有一个场景从我的 Kafka 主题中读取 JSON 数据,通过使用 Kafka 0.11 版本,我需要编写 Java 代码来流式传输 Kafka 主题中存在的 JSON 数据。我的输入是一个包含字典数组的 Json 数据。

现在我的要求是从 json 数据中获取“文本”字段,键入包含在数组中的字典,并通过 Kafka Streaming 将所有这些文本推文传递给另一个主题。

我写代码到这里。请帮我解析数据。

用于流式传输的 Java 代码

final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

KStreamBuilder builder = new KStreamBuilder();

KStream<String, JsonNode> personstwitter =builder.stream(Serdes.String(), jsonSerde, "Persons");//taking the json node as input


personstwitter.to(Serdes.String(), jsonSerde,"Persons-output");
Run Code Online (Sandbox Code Playgroud)

wan*_*onk 7

我建议您通过以下方式更好地控制 JSON 数据。

  1. 写一个SerializerDe-Serializer
  2. 基于 JSON 字符串创建一个 POJO。POJO 是对数据进行更多控制的最佳方式。
  3. 将数据映射到 POJO 以访问所需的数据。

POJO:

@JsonRootName("person")
public class Person implements Serializable {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private String name;
    private String personalID;
    private String country;
    private String occupation;

    public Person() {

    }

    @JsonCreator
    public Person(@JsonProperty("name") String name,@JsonProperty("personalID") String personalID,
            @JsonProperty("country") String country,@JsonProperty("occupation") String occupation){
        this.name= name;
        this.personalID = personalID;
        this.country = country;
        this.occupation = occupation;
    }

    //getters and setters stripped
}
Run Code Online (Sandbox Code Playgroud)

序列化器:

public class JsonSerializer<T> implements Serializer<T> {

    private ObjectMapper om = new ObjectMapper();

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        // TODO Auto-generated method stub

    }

    @Override
    public byte[] serialize(String topic, T data) {
        byte[] retval = null;
        try {
            System.out.println(data.getClass());
            retval = om.writeValueAsString(data).getBytes();
        } catch (JsonProcessingException e) {
            throw new SerializationException();
        }
        return retval;
    }

}
Run Code Online (Sandbox Code Playgroud)

解串器:

public class JsonDeserializer<T> implements Deserializer<T> {

    private ObjectMapper om = new ObjectMapper();
    private Class<T> type;

    /*
     * Default constructor needed by kafka
     */
    public JsonDeserializer() {

    }

    public JsonDeserializer(Class<T> type) {
        this.type = type;
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @SuppressWarnings("unchecked")
    @Override
    public void configure(Map<String, ?> map, boolean arg1) {
        if (type == null) {
            type = (Class<T>) map.get("type");
        }

    }

    @Override
    public T deserialize(String undefined, byte[] bytes) {
        T data = null;
        if (bytes == null || bytes.length == 0) {
            return null;
        }

        try {
            System.out.println(getType());
            data = om.readValue(bytes, type);
        } catch (Exception e) {
            throw new SerializationException(e);
        }

        return data;
    }

    protected Class<T> getType() {
        return type;
    }

}
Run Code Online (Sandbox Code Playgroud)

消费者:

public class ConsumerUtilities {

    public static Properties getProperties() {

        Properties configs = new Properties();
        configs.put(StreamsConfig.APPLICATION_ID_CONFIG,
                "Kafka test application");
        configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        return configs;
    }

    public static KStreamBuilder getStreamingConsumer() {
        KStreamBuilder builder = new KStreamBuilder();
        return builder;
    }

    public static void getStreamData() {
        JsonSerializer<Person> personJsonSerializer = new JsonSerializer<>();
        JsonDeserializer<Person> personJsonDeserializer = new JsonDeserializer<>(
                Person.class);
        Serde<Person> personSerde = Serdes.serdeFrom(personJsonSerializer,
                personJsonDeserializer);
        KStreamBuilder builder = getStreamingConsumer();

        try {

            KStream<String, Person> kStream = builder.stream(Serdes.String(),
                    personSerde, "test");
            kStream.foreach(new ForeachAction<String, Person>() {

                @Override
                public void apply(String arg0, Person arg1) {
                    System.out.println(arg1.getCountry());                  
                }

            });
        } catch (Exception s) {
            s.printStackTrace();
        }
        KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties());
        kafkaStreams.start();
    }

}
Run Code Online (Sandbox Code Playgroud)

制作人:

public class ProducerUtilities {

    public static org.apache.kafka.clients.producer.Producer<String, Person> getProducer() {
        Properties configProperties = new Properties();
        configProperties.put(ProducerConfig.CLIENT_ID_CONFIG,
                "kafka json producer");
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:9092");
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.ByteArraySerializer");
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "com.kafka.api.serdes.JsonSerializer");

        org.apache.kafka.clients.producer.Producer<String, Person> producer = new KafkaProducer<String, Person>(
                configProperties);
        return producer;
    }

    public ProducerRecord<String, Person> createRecord(Person person) {
        ProducerRecord<String, Person> record = new ProducerRecord<String, Person>(
                "test", person);
        return record;
    }

}
Run Code Online (Sandbox Code Playgroud)