Kafka Consumer 是否默认批量大小?

Jim*_*Jim 6 java apache-kafka

Kafka 是否提供默认的批量大小来从主题读取消息?我有以下代码正在读取主题中的消息。

  while (true) {
        final ConsumerRecords<String, User> consumerRecords =
                consumer.poll(500));
        if (consumerRecords.count() == 0) {
            noRecordsCount++;
            if (noRecordsCount > giveUp) break;
            else continue;
        }
        consumerRecords.forEach(record -> {
            User user = record.value();
            userArray.add(user);
        });

        insertInBatch(user)
        consumer.commitAsync();
    }
    consumer.close();
Run Code Online (Sandbox Code Playgroud)

在 insertInBatch 方法中,我将数据保存到数据库中。尽管我在创建 Consumer 时没有指定任何批量大小,但每 500 条记录就会调用此方法。我不认为我的创作方式有什么特别之处。使用 Avro 来发送消息,但我认为这并不重要(?)

Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("auto.commit.enable", "false");
    props.put("auto.offset.reset", "earliest");

    props.put("key.serializer",StringSerializer.class.getName());
    props.put("value.serializer",KafkaAvroDeserializer.class.getName());
    props.put("schema.registry","http://localhost:8081");
Run Code Online (Sandbox Code Playgroud)

cri*_*007 6

是的,有一个默认值max.poll.records

https://kafka.apache.org/documentation/#consumerconfigs

不过,如果您要插入数据库,那么最好使用 Kafka Connect,而不是编写一个显然没有错误处理的消费者(还?)

  • 请注意,“max.poll.records”仅适用于消费者端。在幕后,从代理获取的批次大小由“max.partition.fetch.bytes”、“fetch.min.bytes”和“fetch.max.bytes”控制。 (4认同)