卡夫卡和阿卡集群

Pra*_*nna 6 akka apache-kafka akka-cluster

以下是我的用例

  1. 一堆应用程序在Kafka中根据不同主题排列消息.
  2. 让每个主题的消费者将工作分配给集群中的工作者.工作可分为长时间运行,内存密集型,简单等,并相应地选择工作人员.

这让我探索了Akka集群的工作分配,路由和扩展.我可以使用Akka"Supervisor"作为Kafka消费者,并根据其分类将传入的工作分配给适当的工作人员.

但我仍然想要了解的是在Akka集群中实施主管和工作人员之间的弹性通信方式的正确方法.因为一旦主管消费来自Kafka的消息,就会承诺Kafka抵消.如果在偏移提交之后的处理中发生了一些错误,那么以下可接受的方法是从上次离开的位置恢复和启动吗?

通过使用Kafka支持的持久邮箱,使主管成为持久性actor.主管将卡夫卡的工作排入队伍,工人从卡夫卡获得工作,并在完成工作后才提供抵消.

Ant*_*ine 3

正如 Jaakko 所说,这实际上取决于您使用的第三方库。

就我而言,我已经成功使用了Akka Streams Kafka,尽管我确实启用了偏移量自动提交。

但是,这个库可能会满足您的需求,因为它允许您自定义偏移量提交(请参阅外部偏移量存储Kafka 中的偏移量存储部分)。

文档说:

Consumer.committableSource 可以将偏移位置提交给 Kafka。与自动提交相比,这可以精确控制消息何时被视为已使用。

为了禁用自动提交,您必须application.conf通过添加以下akka.kafka.consumer部分来完成 Akka 文件:

akka.kafka.consumer {

  # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
  # can be defined in this configuration section.

  kafka-clients {
    # Disable auto-commit by default
    enable.auto.commit = false
  }

}
Run Code Online (Sandbox Code Playgroud)

akka-stream-kafka_2.11(version )的最新版本0.16与 Akka 兼容2.5.x,但您必须使用 Akka 工具包之一覆盖 akka-stream_2.11 依赖项。目前,我正在 Akka 中使用这个库2.5.3,它运行得非常好。

希望您能找到您想要的东西:)