如何使用Spring Kafka生产者发送批量数据

pde*_*eva 9 java apache-kafka spring-kafka

目前我有这样的代码:

KafkaTemplate<String, String> kafkaTemplate;

List<Pet> myData;

for(Pet p: myData) {
  String json = objectWriter.writeValueAsString(p)
  kafkaTemplate.send(topic, json)
}

Run Code Online (Sandbox Code Playgroud)

所以每个列表项都是一一发送的。如何一次发送整个列表?

Dea*_*ool 7

因此没有直接的方法可以使用KafkaTemplateKafkaProducer直接将批量消息发送到 kafka 。它们没有任何接受List对象并将它们单独发送到不同分区的方法。

kafka生产者如何向kafka发送消息?

Kafka生产者

Kafka 生产者创建一批记录,然后立即发送所有记录,了解更多信息

生产者由一个缓冲空间池和一个后台 I/O 线程组成​​,该缓冲空间保存尚未传输到服务器的记录,该后台 I/O 线程负责将这些记录转换为请求并将其传输到集群。

send() 方法是异步的。调用时,它将记录添加到待处理记录发送的缓冲区中并立即返回。这使得制作者可以将各个记录批处理在一起以提高效率。

异步发送

批处理是效率的重要驱动因素之一,为了启用批处理,Kafka 生产者将尝试在内存中累积数据并在单个请求中发送更大的批次。可以将批处理配置为累积不超过固定数量的消息,并且等待时间不超过某个固定延迟限制(例如 64k 或 10 ms)。这允许累积更多要发送的字节,并且服务器上很少有较大的 I/O 操作。这种缓冲是可配置的,并提供了一种机制来权衡少量的额外延迟以获得更好的吞吐量。

由于您正在使用,因此spring-kafka您可以发送List<Objects>,但在这里您发送JSONArray的是JSONObject而不是每个JSONObject主题分区

public KafkaTemplate<String, List<Object>> createTemplate() {

        Map<String, Object> senderProps = senderProps();
ProducerFactory<Integer, String> pf =
          new DefaultKafkaProducerFactory<String, List<Object>>(senderProps);
        KafkaTemplate<String, List<Object>> template = new KafkaTemplate<>(pf);
return template;

 }

 public Map<String, Object> producerProps() {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
       return props;

 }

KafkaTemplate<String, List<Object>> kafkaTemplate;
Run Code Online (Sandbox Code Playgroud)


小智 6

一般来说,设置属性就足够了:

 props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
Run Code Online (Sandbox Code Playgroud)

并使用属性增加批处理缓冲区的大小:

props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
Run Code Online (Sandbox Code Playgroud)

根据您的要求。

flush()注意:确保代码中没有方法调用,因为它会丢弃所有批处理设置。