Pra*_*nna 6 akka apache-kafka akka-cluster
以下是我的用例
这让我探索了Akka集群的工作分配,路由和扩展.我可以使用Akka"Supervisor"作为Kafka消费者,并根据其分类将传入的工作分配给适当的工作人员.
但我仍然想要了解的是在Akka集群中实施主管和工作人员之间的弹性通信方式的正确方法.因为一旦主管消费来自Kafka的消息,就会承诺Kafka抵消.如果在偏移提交之后的处理中发生了一些错误,那么以下可接受的方法是从上次离开的位置恢复和启动吗?
通过使用Kafka支持的持久邮箱,使主管成为持久性actor.主管将卡夫卡的工作排入队伍,工人从卡夫卡获得工作,并在完成工作后才提供抵消.
正如 Jaakko 所说,这实际上取决于您使用的第三方库。
就我而言,我已经成功使用了Akka Streams Kafka,尽管我确实启用了偏移量自动提交。
但是,这个库可能会满足您的需求,因为它允许您自定义偏移量提交(请参阅外部偏移量存储和Kafka 中的偏移量存储部分)。
文档说:
Consumer.committableSource 可以将偏移位置提交给 Kafka。与自动提交相比,这可以精确控制消息何时被视为已使用。
为了禁用自动提交,您必须application.conf
通过添加以下akka.kafka.consumer
部分来完成 Akka 文件:
akka.kafka.consumer {
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# can be defined in this configuration section.
kafka-clients {
# Disable auto-commit by default
enable.auto.commit = false
}
}
Run Code Online (Sandbox Code Playgroud)
akka-stream-kafka_2.11
(version )的最新版本0.16
与 Akka 兼容2.5.x
,但您必须使用 Akka 工具包之一覆盖 akka-stream_2.11 依赖项。目前,我正在 Akka 中使用这个库2.5.3
,它运行得非常好。
希望您能找到您想要的东西:)
归档时间: |
|
查看次数: |
769 次 |
最近记录: |