Kafka 是否提供默认的批量大小来从主题读取消息?我有以下代码正在读取主题中的消息。
while (true) {
final ConsumerRecords<String, User> consumerRecords =
consumer.poll(500));
if (consumerRecords.count() == 0) {
noRecordsCount++;
if (noRecordsCount > giveUp) break;
else continue;
}
consumerRecords.forEach(record -> {
User user = record.value();
userArray.add(user);
});
insertInBatch(user)
consumer.commitAsync();
}
consumer.close();
Run Code Online (Sandbox Code Playgroud)
在 insertInBatch 方法中,我将数据保存到数据库中。尽管我在创建 Consumer 时没有指定任何批量大小,但每 500 条记录就会调用此方法。我不认为我的创作方式有什么特别之处。使用 Avro 来发送消息,但我认为这并不重要(?)
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("auto.commit.enable", "false");
props.put("auto.offset.reset", "earliest");
props.put("key.serializer",StringSerializer.class.getName());
props.put("value.serializer",KafkaAvroDeserializer.class.getName());
props.put("schema.registry","http://localhost:8081");
Run Code Online (Sandbox Code Playgroud)
是的,有一个默认值max.poll.records
https://kafka.apache.org/documentation/#consumerconfigs
不过,如果您要插入数据库,那么最好使用 Kafka Connect,而不是编写一个显然没有错误处理的消费者(还?)
| 归档时间: |
|
| 查看次数: |
23448 次 |
| 最近记录: |