了解 Kafka 消息字节大小

xer*_*ge2 5 java spring apache-kafka kafka-consumer-api kafka-producer-api

如何获取Kafka中单个记录的大小?

有一些关于我为什么需要这个的说明。

这似乎不是 ConsumerRecord 或 RecordMetadata 类上公开的serializedValueSize。我真的不明白这个属性的价值,因为它与对消费者有用的消息的大小不匹配。如果不是这个,serializedValueSize 的用途是什么?

我试图让我的 Kafka java 应用程序表现得像“min.poll.records”(如果它存在以补充“max.poll.records”)。我必须这样做,因为这是必需的:)。假设给定主题上的所有消息都具有相同的大小(在本例中确实如此),这应该可以从消费者端通过将 fetch.min.bytes 设置为等于要批处理的消息量乘以每个消息的字节大小来实现信息。

这存在:

https://kafka.apache.org/documentation/#consumerapi

最大轮询记录数

单次调用 poll() 返回的最大记录数。

这不存在,但这是我想要的行为:

最小投票记录

单次调用 poll() 时返回的最小记录数。如果在 fetch.max.wait.ms 中指定的时间过去之前没有足够的记录可用,则无论如何都会返回记录,因此,这不是绝对最小值。

这是我到目前为止发现的:

  • 在生产者方面,我将“batch.size”设置为 1 字节。这迫使生产者单独发送每条消息。

  • 关于消费者大小,我将“max.partition.fetch.bytes”设置为 291 字节。这使得消费者只能返回 1 条消息。将此值设置为 292 会使消费者有时收到 2 条消息。所以我计算出消息大小是292的一半;一条消息的大小为 146 字节

  • 上述要点需要更改 Kafka 配置,并涉及手动查看/grep 一些服务器日志。如果 Kafka Java API 提供这个值那就太好了。

  • 在生产者方面,Kafka 提供了一种在RecordMetadata.serializedValueSize 方法中获取记录的序列化大小的方法。这个值是76字节,与上面测试中给出的146字节有很大不同。

  • 在消费者规模上,Kafka提供了ConsumerRecord API。该记录的序列化值大小也是 76。偏移量每次仅增加 1(而不是记录的字节大小)。

  • 密钥的大小为-1字节(密钥为空)。

System.out.println(myRecordMetadata.serializedValueSize());
// 76
Run Code Online (Sandbox Code Playgroud)
# producer
batch.size=1

# consumer

# Expected this to work:
# 76 * 2 = 152
max.partition.fetch.bytes=152

# Actually works:
# 292 = ??? magic ???
max.partition.fetch.bytes=292
Run Code Online (Sandbox Code Playgroud)

我预计将 max.partition.fetch.bytes 设置为由 serializedValueSize 给出的字节数的倍数将使 Kafka 消费者从轮询中接收最多该数量的记录。相反,max.partition.fetch.bytes 值需要更高才能发生这种情况。

KWe*_*Wer 5

原答案

我不太熟悉该serializedValueSize方法,但根据文档,这只是该消息中存储的值的大小。这将小于总消息大小(即使带有null键),因为消息还包含不属于值的元数据(例如时间戳)。

至于您的问题:与其通过处理消息大小和限制消费者的吞吐量来直接控制轮询,为什么不只是缓冲传入消息直到有足够的可用消息或所需的超时(您提到过,但您可以手动指定一个fetch.max.wait.ms) )已经过去了?

public static <K, V> List<ConsumerRecord<K, V>>
    minPoll(KafkaConsumer<K, V> consumer, Duration timeout, int minRecords) {
  List<ConsumerRecord<K, V>> acc = new ArrayList<>();
  long pollTimeout = Duration.ofMillis(timeout.toMillis()/10);
  long start = System.nanoTime();
  do {
    ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
    for(ConsumerRecord<K, V> record : records)
      acc.add(record);
  } while(acc.size() < minRecords &&
          System.nanoTime() - start < timeout.toNanos());
  return acc;
}
Run Code Online (Sandbox Code Playgroud)

timeout.toMillis()/10调用中的超时是consumer.poll任意的。您应该选择足够小的持续时间,这样即使我们等待的时间长于指定的超时时间(此处:长 10%)也没关系。

编辑:请注意,这可能会返回一个大于max.poll.records(最大值max.poll.records + minRecords - 1)的列表。如果您还需要强制执行此严格的上限,请使用该方法外部的另一个缓冲区来临时存储多余的记录(这可能会更快,但不允许与minPoll普通poll方法混合),或者干脆丢弃它们并使用consumerseek方法进行回溯。

回答更新的问题

因此,问题不在于控制 - 方法返回的消息数量poll,而在于如何获取单个记录的大小。不幸的是,我认为如果不经历很多麻烦这是不可能的。问题是,对此没有真正的(恒定的)答案,甚至一个大概的答案也将取决于 Kafka 版本或更确切地说取决于不同的 Kafka 协议版本。

首先,我不完全确定max.partition.fetch.bytes到底是什么控制(例如:协议开销是否也是其中的一部分?)。让我解释一下我的意思:当消费者发送获取请求时,获取响应由以下字段组成:

  1. 节流时间(4字节)
  2. 主题响应数组(数组长度 4 个字节 + 数组中数据的大小)。

主题响应依次包括

  1. 主题名称(字符串长度 2 个字节 + 字符串大小)
  2. 分区响应数组(数组长度 4 个字节 + 数组中数据的大小)。

分区响应则有

  1. 分区ID(4字节)
  2. 错误代码(2字节)
  3. 高水印(8字节)
  4. 最后稳定偏移量(8 字节)
  5. 日志起始偏移量(8字节)
  6. 中止事务数组(数组长度 4 个字节 + 数组中的数据)
  7. 记录集。

所有这些都可以在文件中找到FetchResponse.java。记录集又由包含记录的记录批次组成。我不会列出包含记录批次的所有内容(您可以在此处查看)。可以说开销是 61 字节。最后,批次中单个记录的大小有点棘手,因为它使用 varint 和 varlong 字段。它包含

  1. 正文大小(1-5 字节)
  2. 属性(1字节)
  3. 时间戳增量(1-10 字节)
  4. 偏移增量(1-5 字节)
  5. 密钥字节数组(1-5字节+密钥数据大小)
  6. 值字节数组(1-5 个字节 + 值数据大小)
  7. 标头(1-5 字节 + 标头数据大小)。

其源代码在这里。正如您所看到的,您不能简单地将 292 字节除以二来获得记录大小,因为某些开销是恒定的并且与记录数无关。

更糟糕的是,记录不具有恒定大小,即使它们的键和值(和标头)具有恒定大小,因为时间戳和偏移量使用可变长度数据类型存储为与批处理时间戳和偏移量的差异。此外,这只是撰写本文时最新协议版本的情况。对于旧版本,答案将会再次不同,谁知道未来版本中会发生什么。