如何识别哪些消息成功发布到 kafka 主题,哪些消息失败

Yhe*_*der 2 java apache-kafka kafka-producer-api

向 apache kafka 发布消息列表。任何人都可以使用 kafka api 提供示例代码,显示如何识别哪些消息已成功发布到主题,哪些消息从响应中失败?(请注意,我将在一个请求中批量发送消息列表。)

Mic*_*son 5

KafkaProducer.send()方法采用单个 ProducerRecord(消息)。

有两种方法可以检查集群是否成功接收到此消息:

  • 使用回调:send()可以将回调作为第二个参数

    ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            // If Exception is null, the record was sent successfully
        }
    });
    
    Run Code Online (Sandbox Code Playgroud)
  • 使用未来:send()返回一个Future

     ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
     Future<RecordMetadata> future = producer.send(record);
     try {
         RecordMetadata rm = future.get();
         // The record was sent successfully
     } catch (ExecutionException e) {
         // The record failed
     }
    
    Run Code Online (Sandbox Code Playgroud)