Kafka随机访问日志

vla*_*t3k 1 apache-kafka

我正在尝试实现一种通过使用 KafkaConsumer.assign(partition)、KafkaConsumer.seek(partition, offset) 随机访问来自 Kafka 的消息的方法。然后读取一条消息的轮询。

但在这种情况下我每秒无法收到超过 500 条消息。相比之下,如果我“订阅”该分区,我每秒会收到 100,000+ 条消息。(@1000 字节消息大小)

我试过了:

  1. Broker、Zookeeper、Consumer在同一主机上和不同主机上。(不使用复制)
  2. 1 和 15 分区
  3. “server.properties”中的默认线程配置并增加到 20 个(io 和网络)
  4. 每次分配给不同分区的单个消费者,每个分区一个消费者
  5. 单线程消费和多线程消费(调用多个不同的消费者)
  6. 添加两个代理和一个新主题及其在两个代理上的分区
  7. 启动多个 Kafka Consumer 进程
  8. 更改消息大小 5k、50k、100k -

在所有情况下,我得到的最小值约为 200 条消息/秒。如果我使用 2-3 个线程,则最大值为 500。但上面的操作使得“.poll()”调用花费的时间越来越长(从单线程的 3-4 毫秒到 10 个线程的 40-50 毫秒)。

我天真的 kafka 理解是消费者打开与代理的连接并发送请求以检索其日志的一小部分。虽然所有这些都会涉及一些延迟,并且检索一批消息会好得多 - 我想它会随着涉及的接收器数量的增加而扩展,但代价是增加运行消费者的虚拟机和运行代理的虚拟机。但两人都在闲置。

显然,代理端发生了一些同步,但我无法弄清楚这是否是由于我使用 Kafka 或使用 .seek 的一些固有限制所致

我希望得到一些关于我是否应该尝试其他方法的提示,或者这就是我所能得到的。

小智 5

Kafka 在设计上就是一个流媒体平台。这意味着已经开发了很多很多东西来加速顺序访问。批量存储消息只是一回事。当你使用Kafka时poll(),你就以这种方式使用Kafka,Kafka会尽力而为。随机访问是Kafka 没有设计的东西。

如果您想要快速随机访问分布式大数据,您可能需要其他东西。例如,像 Cassandra 这样的分布式数据库或像 Hazelcast 这样的内存系统。
此外,您可能希望将 Kafka 流转换为另一种流,这样您就可以使用顺序方式。