在 kafka 中发送同步消息?

6 apache-kafka kafka-producer-api

如何在kafka中发送同步消息?
实现它的一种方法是设置属性参数
max.in.flight.requests.per.connection = 1

但我想知道在 kafka 中是否有一种甚至直接或替代的方式发送同步消息。(类似于 producer.syncSend(...) 等)。

Thi*_*ilo 9

生产者 APIFuturesend. 你可以打电话Future#get阻塞直到发送完成。

请参阅Javadocs 中的示例

如果你想模拟一个简单的阻塞调用,你可以立即调用 get() 方法:

 byte[] key = "key".getBytes();
 byte[] value = "value".getBytes();
 ProducerRecord<byte[],byte[]> record = 
     new ProducerRecord<byte[],byte[]>("my-topic", key, value)
 producer.send(record).get();
Run Code Online (Sandbox Code Playgroud)

  • 按照本答案中的建议调用 get() 只会告诉您消息已完全发送。这在提供模拟同步调用的解决方案方面没有任何作用。涉及 kafka 的真正解决方案将涉及由完成消费者消耗的某种完成消息,这将释放阻塞的请求线程。 (3认同)
  • 如果“同步调用”意味着您想要等到您感兴趣的所有消费者完成您希望他们根据消息采取的任何操作,那么是的,答案不提供“同步 RPC 机制” 。但它确实解决了 OP 对“ Producer.sendSync ”的请求。 (3认同)

Cam*_*not 7

正如 Thilo 建议的那样,您可以调用Future#get阻止,直到发送完成。但是,您可能会遇到一些性能问题,因为当生产者队列有batch.size元素时、当大小的缓冲区buffer.memory已满或在max.block.ms几毫秒后,生产者开始发送。

如果推送到 kafka 的线程数量有限,则每次都必须等待max.block.ms消息发送。因此在某些情况下,您会更喜欢使用:

// send message to producer queue
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, key, message));
// flush producer queue to spare queuing time
producer.flush();
// throw error when kafka is unreachable
future.get(10, TimeUnit.SECONDS);
Run Code Online (Sandbox Code Playgroud)