gug*_*uli 4 java json maven apache-kafka kafka-producer-api
我想在 kafka 主题中生成一条消息。该消息应该具有以下模式:
{"targetFileInfo":{"path":"2018-05-07-10/row01-small-01.txt.ready"}}
Run Code Online (Sandbox Code Playgroud)
我知道这是一个 json 模式,那么我如何将该 json 转换为字符串?
我使用的是maven项目,所以需要使用哪些依赖
String stringData = JSON.stringify({"targetFileInfo":{"path":"2018-05-07-10/row01-small-01.txt.ready"}});
Run Code Online (Sandbox Code Playgroud)
所以我认为最好不要将 Json 转换为字符串并将该消息发送到 kafka 主题。
我的代码是这样的,它可以发送一个字符串,但我不知道如何修改我的代码来发送上面的消息。也许你可以帮助我。
Producer<String, String> producer = null;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
String msg = "welcome";
producer.send(new ProducerRecord<String, String>("event", msg));
producer.close();
Run Code Online (Sandbox Code Playgroud)
这解决了我的问题:
Producer<String, String> producer = null;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
try {
producer = new KafkaProducer<String, String>(props);
} catch (Exception e) {
e.printStackTrace();
}
blobStorageChecker = new BlobStorageChecker();
String folder = blobStorageChecker.getCurrentDateUTC();
String msg = "{\"targetFileInfo\":{\"path\":\"test/"+folder+"row01-small.txt\"},\"sourceFileInfo\":{\"lastModifiedTime\":1525437960000,\"file\":\"/row01-small-01.txt\",\"filename\":\"/data/row01/row01-small.txt\",\"size\":19728,\"remoteUri\":\"ftp://waws-prod-am2-191.ftp.net/data/orsted-real/inbound/row01\",\"contentEncoding\":\"\",\"contentType\":\"\"}}";
ProducerRecord<String, String> record = new ProducerRecord<String, String>("event-orsted-v1", null, msg);
if (producer != null) {
try {
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
} catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
}
}
producer.close();
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
45696 次 |
| 最近记录: |