Kafka消费者(0.8.2.2)可以批量阅读消息吗?

shi*_*455 5 java apache-kafka kafka-consumer-api

根据我的理解,Kafka消费者按顺序从指定的分区读取消息...

我们计划拥有多个Kafka使用者(Java),它们具有相同的组I ..如果它从指定的分区顺序读取,那么我们如何实现高吞吐量..例如,Producer发布消息,如每秒40个.消费者流程每秒消息1 ..虽然我们可以有多个消费者,但不能有40 rt ??? 如我错了请纠正我...

在我们的情况下,消费者只有在消息成功处理后才能提交偏移..这些消息将被重新处理......有没有更好的解决方案???

Mor*_*yon 11

根据您的问题澄清.

Kafka Consumer可以一次读取多条消息.但是Kafka Consumer并不真正读取消息,更正确的说消费者读取一定数量的字节然后根据各个消息的大小来确定将读取多少消息.通过Kafka Consumer Configs读取,您不能指定要获取的消息数,您可以指定消费者可以获取的最大/最小数据大小.但是,在该范围内的许多消息是您将获得多少消息.正如您所指出的那样,您将始终按顺序获取消息.

相关消费者配置(0.9.0.0及更高版本)

  • fetch.min.bytes
  • max.partition.fetch.bytes

UPDATE

在评论中使用您的示例,"我的理解是,如果我在配置中指定读取10个字节,并且如果每个消息是2个字节,则消费者一次读取5个消息." 那是真实的.你的下一个声明,"这意味着这5条消息的偏移在分区中是随机的",这是错误的.阅读顺序并不意味着一个接一个,它只是意味着它们仍然是有序的.您可以批量处理项目并使其保持顺序/有序.请看以下示例.

在Kafka日志中,如果有10条消息(每2个字节)具有以下偏移量,则为[0,1,2,3,5,5,7,8,9].

如果您读取10个字节,您将获得一个包含偏移量[0,1,2,3,4]的消息的批处理.

如果您读取6个字节,您将获得一个包含偏移量[0,1,2]的消息的批处理.

如果您读取6个字节,然后再读取6个字节,您将获得包含消息[0,1,2]和[3,4,5]的两个批次.

如果你读取8个字节,然后是4个字节,你将得到两个包含消息[0,1,2,3]和[4,5]的批次.

更新:澄清提交

我不是100%确定如何工作,我主要是在Storm环境中与Kafka合作.提供的KafkaSpout会自动提交Kafka消息.

但是看看0.9.0.1 Consumer API,我建议你这样做.似乎有三种与本讨论相关的方法.

  • 轮询(长时间超时)
  • commitSync()
  • commitSync(java.util.Map偏移量)

poll方法检索消息,可能只有1,可能是20,对于你的例子,可以说3条消息被返回[0,1,2].你现在有这三条消息.现在由您决定如何处理它们.您可以处理它们0 => 1 => 2,1 => 0 => 2,2 => 0 => 1,它只是取决于.但是你处理它们,在处理之后你会想要提交告诉Kafka服务器你已完成这些消息.

使用commitSync()提交上次轮询时返回的所有内容,在这种情况下,它将提交偏移量[0,1,2].

另一方面,如果选择使用commitSync(java.util.Map偏移),则可以手动指定要提交的偏移量.如果您按顺序处理它们,则可以处理偏移量0然后提交它,处理偏移量1然后提交它,最后处理偏移量2并提交.

总而言之,Kafka为您提供了处理消息如何渴望的自由,您可以选择按顺序或完全随机地处理它们.

  • 如果有3条消息,每3个字节,您将消耗3条消息.如果有一个3字节消息和一个8字节消息,则只消耗第一条消息,因为3 + 8 = 11不适合10. (2认同)