小编Jim*_*Jim的帖子

Kafka Consumer 是否默认批量大小?

Kafka 是否提供默认的批量大小来从主题读取消息?我有以下代码正在读取主题中的消息。

  while (true) {
        final ConsumerRecords<String, User> consumerRecords =
                consumer.poll(500));
        if (consumerRecords.count() == 0) {
            noRecordsCount++;
            if (noRecordsCount > giveUp) break;
            else continue;
        }
        consumerRecords.forEach(record -> {
            User user = record.value();
            userArray.add(user);
        });

        insertInBatch(user)
        consumer.commitAsync();
    }
    consumer.close();
Run Code Online (Sandbox Code Playgroud)

在 insertInBatch 方法中,我将数据保存到数据库中。尽管我在创建 Consumer 时没有指定任何批量大小,但每 500 条记录就会调用此方法。我不认为我的创作方式有什么特别之处。使用 Avro 来发送消息,但我认为这并不重要(?)

Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("auto.commit.enable", "false");
    props.put("auto.offset.reset", "earliest");

    props.put("key.serializer",StringSerializer.class.getName());
    props.put("value.serializer",KafkaAvroDeserializer.class.getName());
    props.put("schema.registry","http://localhost:8081");
Run Code Online (Sandbox Code Playgroud)

java apache-kafka

6
推荐指数
1
解决办法
2万
查看次数

标签 统计

apache-kafka ×1

java ×1