Kafka 中消息的大小开销是多少?

Ash*_*sle 2 java apache-kafka

假设原始消息大小为 500 字节(在发送到 Kafka 之前)。那么消息发送到Kafka后的大小是多少呢?如果我们使用任何压缩会怎样?

附加信息:我将一个大小为 2048 字节的 ByteBuffer 放入一个主题(具有单个分区),没有任何键。

Topic name: ub3
Path: /data/kafka-logs/ub3-0

[hdpusr@hdpdev2 ub3-0]$ $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hdpdev2:8092 --topic ub3 --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
184
[hdpusr@hdpdev2 ub3-0]$ du -sh *
10M     00000000000000000000.index
448K    00000000000000000000.log
10M     00000000000000000000.timeindex
4.0K    leader-epoch-checkpoint
[hdpusr@hdpdev2 ub3-0]$
[hdpusr@hdpdev2 ub3-0]$
[hdpusr@hdpdev2 ub3-0]$ $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hdpdev2:8092 --topic ub3 --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
86284
[hdpusr@hdpdev2 ub3-0]$ du -sh *
10M     00000000000000000000.index
256M    00000000000000000000.log
10M     00000000000000000000.timeindex
4.0K    leader-epoch-checkpoint
[hdpusr@hdpdev2 ub3-0]$


[hdpusr@hdpdev2 ub3-0]$ $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hdpdev2:8092 --topic ub3 --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
172405
[hdpusr@hdpdev2 ub3-0]$ du -sh *
10M     00000000000000000000.index
512M    00000000000000000000.log
10M     00000000000000000000.timeindex
4.0K    leader-epoch-checkpoint
[hdpusr@hdpdev2 ub3-0]$



[hdpusr@hdpdev2 ub3-0]$ $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hdpdev2:8092 --topic ub3 --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
258491
[hdpusr@hdpdev2 ub3-0]$ du -sh *
10M     00000000000000000000.index
596M    00000000000000000000.log
10M     00000000000000000000.timeindex
4.0K    leader-epoch-checkpoint
[hdpusr@hdpdev2 ub3-0]$



[hdpusr@hdpdev2 ub3-0]$ $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hdpdev2:8092 --topic ub3 --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
344563
[hdpusr@hdpdev2 ub3-0]$ du -sh *
10M     00000000000000000000.index
1.1G    00000000000000000000.log
10M     00000000000000000000.timeindex
4.0K    leader-epoch-checkpoint
[hdpusr@hdpdev2 ub3-0]$
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述

mad*_*ead 5

简短的回答是:谁知道呢?

\n\n

但让我们尝试找出一些数字。我已经使用本指南在 Docker 中启动了 Kafka 。然后,写了一个简单的生产者:

\n\n
public class App {\n    public static void main(String[] args) throws Exception {\n        final Producer<String, byte[]> producer = producer();\n\n        producer.send(\n                new ProducerRecord<>(\n                        "test",\n                        key(),\n                        value()\n                )\n        ).get();\n    }\n\n    private static Producer<String, byte[]> producer() {\n        final Properties props = new Properties();\n\n        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");\n        props.put(ProducerConfig.CLIENT_ID_CONFIG, "so57472830");\n        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());\n        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());\n\n        return new KafkaProducer<>(props);\n    }\n\n    private static String key() {\n        return UUID.randomUUID().toString();\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

localhost:9092因此,将使用等于主题的客户端so57472830ID发送test。有效负载是字节数组,键是字符串 UUID。正如您稍后将看到的,所有这些值(主机:端口除外)都会产生“开销”。在这里,我认为开销就是除了消息有效负载本身之外的所有内容。

\n\n

让我们从“你好,世界!”开始:

\n\n
private static byte[] value() {\n    return "Hello, world!".getBytes();\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

运行应用程序并捕获到 的流量localhost:9092。我为此使用了WireShark 。

\n\n

\n\n

在这里我找到了带有有效负载的消息。让我们看看整个 TCP 流(WireShark 中的“跟随 TCP 流”):

\n\n

\n\n

因此,整个流占用了 527 个字节,其中客户端发送了 195 个字节(用玫瑰色突出显示):

\n\n

\n\n

(这也意味着 Kafka 发送 527 - 195 == 332 字节作为响应):

\n\n

\n\n

我们的有效负载是 13 个字节。正如您所注意到的,出站流量包含两次客户端 ID(2 \xc3\x97 10 字节)和消息密钥(16 字节)。因此,在 195 个字节中发送 146 个字节是个谜(可能是您在问题中将其命名为“开销”的字节)。

\n\n

让我们发送 500 个随机字节:

\n\n
private static byte[] value() {\n    final byte[] result = new byte[500];\n\n    new Random().nextBytes(result);\n\n    return result;\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

出站流量为 684 字节(整个对话花费了 1016 字节):

\n\n

\n\n

同样,服务器发送 332 字节作为响应,出站神秘(开销)由 684 - (500 + 2 \xc3\x97 10 + 16) = 164 字节组成!

\n\n

所有这些数字都不是最终的,可能会随着生产者版本或特定配置设置而变化。您提到过,其中之一是压缩。让我们来看看。请注意,压缩取决于数据。随机字节比常量字节更难压缩,因为它们具有更多的熵。因此,让我们通过 GZIP 压缩发送 500 个重复字节。如果没有压缩,数字是相同的:

\n\n

\n\n

添加props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");producer()方法并更改value()

\n\n
private static byte[] value() {\n    final byte[] result = new byte[500];\n\n    Arrays.fill(result, (byte) \'a\');\n\n    return result;\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

当启用压缩时,消息(key和value,而不是客户端id和topic)被压缩,出站流量只有208字节:

\n\n

\n\n

我想说,开销与上面的示例大致相同,压缩会影响消息本身的大小。

\n\n

这一切都适用于流量,但在编辑后我发现您对存储大小感兴趣。尽管如此,我想说答案是一样的:“谁知道呢”。这些数字绝对取决于您的配置。

\n