6 apache-kafka kafka-producer-api
如何在kafka中发送同步消息?
实现它的一种方法是设置属性参数
max.in.flight.requests.per.connection = 1。
但我想知道在 kafka 中是否有一种甚至直接或替代的方式发送同步消息。(类似于 producer.syncSend(...) 等)。
生产者 APIFuture从send. 你可以打电话Future#get阻塞直到发送完成。
请参阅Javadocs 中的此示例:
如果你想模拟一个简单的阻塞调用,你可以立即调用 get() 方法:
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record =
new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();
Run Code Online (Sandbox Code Playgroud)
正如 Thilo 建议的那样,您可以调用Future#get阻止,直到发送完成。但是,您可能会遇到一些性能问题,因为当生产者队列有batch.size元素时、当大小的缓冲区buffer.memory已满或在max.block.ms几毫秒后,生产者开始发送。
如果推送到 kafka 的线程数量有限,则每次都必须等待max.block.ms消息发送。因此在某些情况下,您会更喜欢使用:
// send message to producer queue
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, key, message));
// flush producer queue to spare queuing time
producer.flush();
// throw error when kafka is unreachable
future.get(10, TimeUnit.SECONDS);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
18661 次 |
| 最近记录: |