我们有Kafka设置,可以通过多个服务器并行处理消息.但每条消息必须只处理一次(并且只能由一台服务器处理).我们已经启动并运行它并且工作正常.
现在,我们面临的问题是Kafka消费者分批阅读消息以获得最大效率.如果/当处理失败,服务器关闭或其他什么时,这会导致问题,因为这样我们就会丢失即将处理的数据.
有没有办法让消费者一次只读取消息让Kafka保留未处理的消息?就像是; 消费者在完成后拉出一条消息 - >进程 - >提交偏移,重复.使用Kafka这是可行的吗?有什么想法/想法吗?
谢谢!
您提到只有一个处理,但您担心会丢失数据。我假设您只是担心其中一台服务器发生故障时的边缘情况?你会丢失数据吗?
我认为没有办法一次完成一条消息。查看消费者配置,似乎只有一个选项用于设置消费者可以从 Kafka 获取的最大字节数,而不是消息数量。
fetch.message.max.bytes
Run Code Online (Sandbox Code Playgroud)
但是,如果您担心完全丢失数据,如果您从未提交偏移量,Kafka 不会将其标记为已提交,并且不会丢失。阅读有关传递语义的 Kafka 文档,
因此,Kafka 默认保证至少一次交付,并允许用户通过禁用生产者重试并在处理一批消息之前提交其偏移量来实现最多一次交付。一次性交付需要与目标存储系统合作,但 Kafka 提供了偏移量,这使得实现这一点变得简单。
因此Kafka默认不支持Exactly-once处理。每当您将处理的输出写入存储时,它都要求您实现存储偏移量。
但这可以通过简单地让消费者将其偏移量存储在与其输出相同的位置来处理得更简单和普遍......作为一个例子,我们在 HDFS 中填充数据的 Hadoop ETL 将其偏移量及其数据存储在 HDFS 中读取,以便保证数据和偏移量要么都更新,要么都不更新。
我希望这有帮助。
归档时间: |
|
查看次数: |
11987 次 |
最近记录: |