Kafka 设置从主题读取的最大消息数

lea*_*man 5 java memory apache-kafka kafka-consumer-api

我是 Apache Kafka 的新手,正在探索 SimpleConsumer 以读取来自主题的消息。

我使用下面的一段代码来做同样的事情,

FetchRequestBuilder builder = new FetchRequestBuilder();
FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, 1024).build();
FetchResponse fetchResponse;
try {
     fetchResponse = consumer.fetch(fetchRequest);
 } catch (Exception e) {}
Run Code Online (Sandbox Code Playgroud)

这会读取特定分区中的所有可用消息;我想设置要读取的最大消息数。在这个阶段有没有办法做到这一点?当队列中有大量消息时,我不希望所有消息都登陆 JVM 堆。

另一个问题,

以下代码返回一个 ByteBufferMessageSet。

fetchResponse.messageSet(topic, partitionId);
Run Code Online (Sandbox Code Playgroud)

这是否意味着,并非所有可用消息实际上都在内存中?

小智 3

虽然您无法限制消息数量,但您可以限制每个请求每个主题分区接收的字节数。但是,这应该作为配置设置完成,而不是作为消费者实现代码的一部分。Kafka消费者配置文档说您可以指定读取的最大字节数为socket.receive.buffer.bytes. 这应该允许您更细粒度地控制 Kafka 消息在 JVM 堆中占用的确切空间。请注意,该值必须等于或大于代理上的最大消息大小,否则生产者可能会发送太大而无法使用的消息。