小编xer*_*ge2的帖子

了解 Kafka 消息字节大小

如何获取Kafka中单个记录的大小?

有一些关于我为什么需要这个的说明。

这似乎不是 ConsumerRecord 或 RecordMetadata 类上公开的serializedValueSize。我真的不明白这个属性的价值,因为它与对消费者有用的消息的大小不匹配。如果不是这个,serializedValueSize 的用途是什么?

我试图让我的 Kafka java 应用程序表现得像“min.poll.records”(如果它存在以补充“max.poll.records”)。我必须这样做,因为这是必需的:)。假设给定主题上的所有消息都具有相同的大小(在本例中确实如此),这应该可以从消费者端通过将 fetch.min.bytes 设置为等于要批处理的消息量乘以每个消息的字节大小来实现信息。

这存在:

https://kafka.apache.org/documentation/#consumerapi

最大轮询记录数

单次调用 poll() 返回的最大记录数。

这不存在,但这是我想要的行为:

最小投票记录

单次调用 poll() 时返回的最小记录数。如果在 fetch.max.wait.ms 中指定的时间过去之前没有足够的记录可用,则无论如何都会返回记录,因此,这不是绝对最小值。

这是我到目前为止发现的:

  • 在生产者方面,我将“batch.size”设置为 1 字节。这迫使生产者单独发送每条消息。

  • 关于消费者大小,我将“max.partition.fetch.bytes”设置为 291 字节。这使得消费者只能返回 1 条消息。将此值设置为 292 会使消费者有时收到 2 条消息。所以我计算出消息大小是292的一半;一条消息的大小为 146 字节

  • 上述要点需要更改 Kafka 配置,并涉及手动查看/grep 一些服务器日志。如果 Kafka Java API 提供这个值那就太好了。

  • 在生产者方面,Kafka 提供了一种在RecordMetadata.serializedValueSize 方法中获取记录的序列化大小的方法。这个值是76字节,与上面测试中给出的146字节有很大不同。

  • 在消费者规模上,Kafka提供了ConsumerRecord API。该记录的序列化值大小也是 76。偏移量每次仅增加 1(而不是记录的字节大小)。

  • 密钥的大小为-1字节(密钥为空)。

System.out.println(myRecordMetadata.serializedValueSize());
// 76
Run Code Online (Sandbox Code Playgroud)
# producer
batch.size=1

# consumer

# Expected this to work:
# 76 * 2 …
Run Code Online (Sandbox Code Playgroud)

java spring apache-kafka kafka-consumer-api kafka-producer-api

5
推荐指数
1
解决办法
7410
查看次数

为什么我的 Kafka 消费者民意调查这么快?

我的 Kafka 消费者的轮询速度比我预期的要快。我可以更改一些配置以使其一直等待吗fetch.max.wait.ms

我将 fetch.max.wait.ms 设置为一定秒数 (5)。我设置fetch.min.bytes为一些较大的字节数(99,988,800)。

我阅读了文档(但可能遗漏了一些内容):

https://kafka.apache.org/documentation/

  • 获取最小字节数

  • 服务器应为获取请求返回的最小数据量。如果可用数据不足,请求将等待积累足够多的数据,然后再答复请求。默认设置为 1 字节,意味着只要有一个字节的数据可用,或者获取请求在等待数据到达时超时,就会立即应答获取请求。将其设置为大于 1 将导致服务器等待大量数据的积累,这可以稍微提高服务器吞吐量,但会带来一些额外的延迟。

  • fetch.max.wait.ms

  • 如果没有足够的数据来立即满足 fetch.min.bytes 给出的要求,则服务器在应答提取请求之前将阻塞的最长时间。

fetch.max.wait.ms=5000,
fetch.min.bytes=99988800
Run Code Online (Sandbox Code Playgroud)

根据我的配置选项和数据集,我希望调用poll在返回任何记录之前始终阻塞 5 秒。

相反,有时调用会poll在不到一秒的时间内解决,并且总是有一些少量记录。

以下是示例运行的输出:

// send 100 records
// doesn't matter how

// timestamp -> records received
// (date, hour and minute are not shown, just the relevant seconds.millis)

32.475 -> 10
33.392 -> 12
34.116 -> 16
37.477 -> 16
38.395 -> 18
39.118 -> 17
42.479 -> 7 …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api spring-kafka

5
推荐指数
1
解决办法
3435
查看次数

如何使用工厂为特定主题配置 Spring Kafka Listener?

我希望能够通过属性读取主题,而无需在 Kafka 侦听器注释上指定任何内容。不使用 Spring Boot。

我尝试通过“主题”键直接从属性对象读取主题。这给出了一个错误:IllegalStateException:topics, topicPattern, or topicPartitions must be provided.

// some class
@KafkaListener
public void listener(List<String> messages) {
  System.out.print(messages);
}

//some other class
@Bean
public ConsumerFactory<String, String> consumerFactory(Properties topicProp) {
  return new DefaultKafkaConsumerFactory(topicProp);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

  Properties prop = new Properties();
  prop.setProperty("topics", "my-custom-topic");

  factory.setConsumerFactory(this.consumerFactory(prop));
  return factory;
}

Is this possible?
Run Code Online (Sandbox Code Playgroud)

java spring apache-kafka kafka-consumer-api spring-kafka

0
推荐指数
1
解决办法
2876
查看次数