gau*_*rav 70 apache-kafka kafka-consumer-api
KeyedMessage<String, byte[]> keyedMessage = new KeyedMessage<String, byte[]>(request.getRequestTopicName(), SerializationUtils.serialize(message));
producer.send(keyedMessage);
Run Code Online (Sandbox Code Playgroud)
目前,我发送的消息没有任何密钥作为键控消息的一部分,它仍然可以使用delete.retention.ms吗?我是否需要发送密钥作为邮件的一部分?将密钥作为消息的一部分是否合适?
kuu*_*ujo 134
如果您需要对密钥进行强有力的订购并且正在开发类似状态机的东西,则密钥通常是有用/必要的.如果您要求始终以正确的顺序查看具有相同密钥的消息(例如,唯一ID),则将密钥附加到消息将确保具有相同密钥的消息始终转到主题中的同一分区.Kafka保证分区内的顺序,但不保证主题中的分区之间的顺序,因此,不提供密钥 - 这将导致跨分区的循环分配 - 将不会保持这样的顺序.
对于状态机,可以将密钥与log.cleaner.enable一起使用,以使用相同的密钥对条目进行重复数据删除.在这种情况下,Kafka假定您的应用程序只关心给定密钥的最新实例,并且只有在密钥不为空时,日志清理程序才会删除给定密钥的旧副本.这种形式的日志压缩由log.cleaner.delete.retention属性控制,并且需要密钥.
或者,更常见的属性log.retention.hours(默认情况下已启用)通过删除已过期的日志的完整段来工作.在这种情况下,不必提供密钥.Kafka将简单地删除比给定保留期更早的日志块.
这就是说,如果你已经启用了日志压缩或要求对具有相同密钥的消息进行严格的订购,那么你肯定应该使用密钥.否则,null键可以提供更好的分发并防止在某些键可能看起来比其他键更多的情况下潜在的热点问题.
mik*_*ike 39
tl;dr 不,向 Kafka 发送消息不需要密钥。但...
除了非常有帮助的公认答案之外,我还想添加更多细节
默认情况下,Kafka 使用消息的 key 来选择它写入的主题的分区。这是在DefaultPartitioner由
kafka.common.utils.Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
Run Code Online (Sandbox Code Playgroud)
如果没有提供密钥,那么 Kafka 将以循环方式对数据进行分区。
在 Kafka 中,可以通过扩展Partitioner类来创建自己的 Partitioner 。为此,您需要覆盖partition具有签名的方法:
kafka.common.utils.Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
Run Code Online (Sandbox Code Playgroud)
通常,Kafka 消息的键用于选择分区,返回值(类型int)是分区号。如果没有密钥,您需要依赖于处理起来可能要复杂得多的值。
正如给定的答案中所述,Kafka 保证仅在分区级别对消息进行排序。
假设您想将客户的金融交易存储在具有两个分区的 Kafka 主题中。消息可能看起来像 (key:value)
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 2, "changeInBankAccount": +100}
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": -1337}
null:{"customerId": 1, "changeInBankAccount": +200}
Run Code Online (Sandbox Code Playgroud)
由于我们没有定义一个键,这两个分区大概看起来像
// partition 0
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": +200}
// partition 1
null:{"customerId": 2, "changeInBankAccount": +100}
null:{"customerId": 1, "changeInBankAccount": -1337}
Run Code Online (Sandbox Code Playgroud)
您阅读该主题的消费者最终可能会告诉您,该帐户的余额在特定时间为 600,尽管事实并非如此!只是因为它先于分区 1 中的消息读取分区 0 中的所有消息。
使用有意义的键(如 customerId)可以避免这种情况,因为分区将是这样的:
// partition 0
1:{"customerId": 1, "changeInBankAccount": +200}
1:{"customerId": 1, "changeInBankAccount": +200}
1:{"customerId": 1, "changeInBankAccount": -1337}
1:{"customerId": 1, "changeInBankAccount": +200}
// partition 1
2:{"customerId": 2, "changeInBankAccount": +100}
Run Code Online (Sandbox Code Playgroud)
请记住,只有在生产者配置max.in.flight.requests.per.connection设置为 的情况下才能保证分区内的排序1。但是,该配置的默认值是,5它被描述为:
"客户端在阻塞前将在单个连接上发送的最大未确认请求数。请注意,如果将此设置设置为大于 1 并且发送失败,则存在由于重试而导致消息重新排序的风险(即,如果启用重试)。”
您可以在另一篇关于Kafka-Message Ordering Guarantees 的Stackoverflow 帖子中找到更多详细信息。
如果您的消息中没有密钥,您将无法将主题配置设置cleanup.policy为compacted。根据文档“日志压缩确保 Kafka 将始终为单个主题分区的数据日志中的每个消息键至少保留最后一个已知值。”。
如果没有任何键,就无法使用这个漂亮而有用的设置。
在实际用例中,Kafka 消息的密钥会对您的性能和业务逻辑的清晰度产生巨大影响。
例如,一个键可以自然地用于对您的数据进行分区。由于您可以控制您的使用者从特定分区读取,因此可以作为有效的过滤器。此外,该键还可以包含有关消息实际值的一些元数据,以帮助您控制后续处理。键通常小于值,因此解析键而不是整个值更方便。同时,您可以应用所有序列化和模式注册,就像使用您的值也使用键一样。
请注意,还有Header的概念可用于存储信息,请参阅文档。
| 归档时间: |
|
| 查看次数: |
49930 次 |
| 最近记录: |