我是Kafka,Serialization和JSON的新手
我想要的是生产者通过kafka和消费者发送JSON文件以使用原始文件形式的JSON文件.
我能够得到它所以JSON转换为字符串并通过String Serializer发送然后消费者将解析String并重新创建一个JSON对象但我担心这不是有效的或正确的方法(可能会失去字段类型对于JSON)
所以我研究了制作JSON序列化程序并在我的制作人的配置中设置它.
我在这里使用了JsonEncoder:Kafka:编写自定义序列化器
但是当我现在尝试运行我的生产者时,似乎在编码器的toBytes函数中,try块永远不会返回任何像我想要的那样
try {
bytes = objectMapper.writeValueAsString(object).getBytes();
} catch (JsonProcessingException e) {
logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
}
Run Code Online (Sandbox Code Playgroud)
似乎objectMapper.writeValueAsString(object).getBytes(); 接受我的JSON obj({"name":"Kate","age":25})并将其转换为空,
这是我的制作人的跑步功能
List<KeyedMessage<String,JSONObject>> msgList=new ArrayList<KeyedMessage<String,JSONObject>>();
JSONObject record = new JSONObject();
record.put("name", "Kate");
record.put("age", 25);
msgList.add(new KeyedMessage<String, JSONObject>(topic, record));
producer.send(msgList);
Run Code Online (Sandbox Code Playgroud)
我错过了什么?我的原始方法(转换为字符串并发送然后重建JSON obj)是否可以?或者只是没有正确的方法去?
谢谢!