避免apache kafka使用者中重复消息的有效策略

Sha*_*s88 37 java message-queue apache-kafka

我一直在学习apache kafka一个月了.然而,我现在陷入了困境.我的用例是,我有两个或更多的消费者进程在不同的机器上运行.我运行了一些测试,其中我在kafka服务器上发布了10,000条消息.然后在处理这些消息时,我杀死了一个消费者进程并重新启动它.消费者在文件中编写处理过的消息.消费完成后,文件显示超过10k条消息.所以有些消息是重复的.

在消费者流程中,我已禁用自动提交.消费者手动批量提交偏移量.因此,例如,如果将100条消息写入文件,则消费者提交偏移量.当单个消费者进程正在运行并且以这种方式避免崩溃并恢复重复时.但是当多个消费者正在运行并且其中一个消失并且恢复时,它会将重复的消息写入文件.

是否有任何有效的策略来避免这些重复的消息?

kuu*_*ujo 30

最简洁的答案是不.

你正在寻找的只是一次处理.虽然它似乎经常是可行的,但它永远不应该依赖,因为总有一些警告.

即使为了尝试防止重复,您也需要使用简单的使用者.这种方法的工作原理是每个消费者,当从某个分区消费消息时,将消费消息的分区和偏移写入磁盘.当消费者在失败后重新启动时,从磁盘读取每个分区的最后消耗的偏移量.

但即使采用这种模式,消费者也无法保证在失败后不会重新处理消息.如果消费者在将偏移量刷新到磁盘之前消耗了消息然后失败了怎么办?如果在处理消息之前写入磁盘,如果编写偏移量然后在实际处理消息之前失败怎么办?即使您在每条消息之后向ZooKeeper提交偏移量,也会存在同样的问题.

但是,在某些情况下,一次性处理更容易实现,但仅适用于某些用例.这只需要将偏移量存储在与单元应用程序输出相同的位置.例如,如果您编写一个计算消息的消费者,通过存储每个计数的最后计算的偏移量,您可以保证偏移量与消费者的状态同时存储.当然,为了保证一次性处理,这将要求您只消耗一条消息并为每条消息更新一次状态,这对于大多数Kafka消费者应用程序来说是完全不切实际的.就其性质而言,Kafka出于性能原因分批消费消息.

通常,如果您只是将其设计为幂等的,那么您的时间会更好,并且您的应用程序将更加可靠.


RaG*_*aGe 26

这就是卡夫卡常见问题解答中关于完全一次的主题:

如何从卡夫卡获得一次完整的消息?

恰好一旦语义有两个部分:避免数据生成期间的重复并避免数据消费期间的重复.

在数据生成期间,有两种方法可以获得完全一次的语义:

  • 每个分区使用一个单一写程序,每次遇到网络错误时,请检查该分区中的最后一条消息,看看上次写入是否成功
  • 在消息中包含主键(UUID或其他内容),并在使用者上进行重复数据删除.

如果您执行其中一项操作,Kafka托管的日志将不会重复.然而,没有重复的阅读也取决于消费者的一些合作.如果消费者定期检查其位置,那么如果它失败并重新启动,它将从检查点位置重新开始.因此,如果数据输出和检查点不是原子写入的,那么也可以在这里获得重复数据.此问题特别适用于您的存储系统.例如,如果您使用的是数据库,则可以在事务中一起提交这些数据库.LinkedIn编写的HDFS加载器Camus为Hadoop加载做了类似的事情.另一个不需要事务的替代方法是使用主题/分区/偏移量组合存储加载数据的偏移量和重复数据删除.

我认为有两个改进可以使这更容易:

  • 通过可选地在服务器上集成对此的支持,可以自动且更便宜地完成生产者幂等性.
  • 现有的高级消费者不会暴露许多更细粒度的抵消控制(例如重置你的位置).我们将尽快完成这项工作


pei*_*han 16

我同意RaGe在消费者方面的重复数据删除.我们使用Redis来重复删除Kafka消息.

假设Message类有一个名为'uniqId'的成员,它由生产者方填充并保证是唯一的.我们使用12长度的随机字符串.(正则表达式'^[A-Za-z0-9]{12}$')

消费者方使用Redis的SETNX进行重复数据删除,并使用EXPIRE自动清除过期的密钥.示例代码:

Message msg = ... // eg. ConsumerIterator.next().message().fromJson();
Jedis jedis = ... // eg. JedisPool.getResource();
String key = "SPOUT:" + msg.uniqId; // prefix name at will
String val = Long.toString(System.currentTimeMillis());
long rsps = jedis.setnx(key, val);
if (rsps <= 0) {
    log.warn("kafka dup: {}", msg.toJson()); // and other logic
} else {
    jedis.expire(key, 7200); // 2 hours is ok for production environment;
}
Run Code Online (Sandbox Code Playgroud)

当Kafka(版本0.8.x)出现问题时,上面的代码确实多次检测到重复的消息.使用我们的输入/输出余额审计日志,没有消息丢失或重复发生.


Chr*_*row 8

现在 Kafka 中有一个相对较新的“事务 API ”,它可以让您在处理流时实现一次处理。使用事务 API,可以内置幂等性,只要您的系统的其余部分是为幂等性设计的。见https://www.baeldung.com/kafka-exactly-once

  • 这仅适用于生产者使用事务 API 的情况,否则消费者无法从此模式中受益。 (3认同)

小智 5

无论生产者端做了什么,我们认为从 kafka 一次性交付的最佳方式仍然是在消费者端处理它:

  1. 使用 uuid 生成 msg 作为主题 T1 的 Kafka 消息密钥
  2. 消费者端从T1读取msg,以uuid作为rowkey写入hbase
  3. 使用相同的 rowkey 从 hbase 读回并写入另一个主题 T2
  4. 让您的最终消费者实际消费主题 T2