usm*_*man 7 java apache-kafka kafka-consumer-api kafka-producer-api
当我发出消息时,我想从经纪人那里得到一些回应.我已经尝试过使用的CallBack机制(通过实现CallBack),KafkaProducer.send但它没有工作,也没有调用onCompletion方法.
当我关闭Kafka服务器并尝试生成消息时,它会调用回调方法.
有没有其他方式得到确认?
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
System.out.println("Called Callback method");
if (metadata != null) {
System.out.println("message(" + key + ", " + message
+ ") sent to partition(" + metadata.partition() + "), "
+ "offset(" + metadata.offset() + ") in " + elapsedTime
+ " ms");
} else {
exception.printStackTrace();
}
}
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "mytopic");
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
props.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
KafkaProducer<String, byte[]> producer = new KafkaProducer<String,byte[]>(props);
long runtime = new Date().getTime();
String ip = "192.168.2."+ rnd.nextInt(255);
String msg = runtime + ".www.ppop.com," + ip;
producer.send(new ProducerRecord<String, byte[]>("mytopic", msg.getBytes()), `new TransCallBack(Calendar.getInstance().getTimeInMillis(), key, msg));`
Run Code Online (Sandbox Code Playgroud)
我使用kafka-client api 0.9.1与代理版本0.8.2.
KafkaProducer 使用 kafka 0.9 版本发布消息后,有一种简单的方法可以从代理获取信息。您可以调用 get() 方法,该方法将返回一个 RecordMetadata 对象,您可以获取偏移量、topicPartition 等信息,下面的代码片段作为示例:
RecordMetadata m = kafkaProducer.send(new ProducerRecord<byte[], byte[]>(
topic, key.getBytes("UTF-8"), message
.getBytes("UTF-8"))).get();
System.out.println("Message produced, offset: " + m.offset());
System.out.println("Message produced, partition : " + m.partition());
System.out.println("Message produced, topic: " + m.topic());
Run Code Online (Sandbox Code Playgroud)
所以我不能 100% 确定哪个版本适用于 Kafka 中的哪个版本。目前我使用 0.8.2,我知道 0.9 引入了一些重大更改,但我无法确定现在哪些有效/无效。
一个非常强烈的建议是,我会使用与您的代理版本相对应的 Kafka-Client 版本。如果您使用的是broker 0.8.2,我也会使用kakfa-client 0.8.2。
您从未提供过任何有关如何使用它的代码,所以我只是在黑暗中猜测。但我在 Kafka 0.8.2 中通过在生产者中使用此方法实现了回调功能。下面是方法签名。
public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)
Run Code Online (Sandbox Code Playgroud)
我将在哪里调用该方法,实际上我用重写的方法传入了类。
KafkaProducer<String, String> prod = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> record = //data to send to kafka
prod.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
//implement logic here, or call another method to process metadata
System.out.println("Callback");
}
}
});
Run Code Online (Sandbox Code Playgroud)
我假设有一种方法可以像您一样做到这一点。但是您必须提供代码来显示您实际上如何将记录发送到 Kafka。除此之外我只是猜测。
| 归档时间: |
|
| 查看次数: |
16768 次 |
| 最近记录: |