如何处理Kafka中的各种故障情况

Mah*_*aha 5 apache-kafka kafka-consumer-api

我们面临的问题:

\n\n

在我们的系统中,我们将票证的状态记录到数据库中NEW,并将其放入 kafka 队列中以供进一步处理。处理器从 kafka 队列中挑选这些票证,进行处理并相应地更新状态。我们发现有些票被遗留在NEW状态。所以我们猜测门票是否无法在队列中生成或没有被消耗。

\n\n


\n\n

消息丢失/重复场景(以及其他一些相关点):

\n\n

因此,我开始详尽地挖掘,以了解我们可以通过哪些方式来面对 Kafka 中的消息丢失和重复。下面我列出了我在这篇文章中可以找到的所有可能的消息丢失和重复场景中可以找到的所有可能的消息丢失和重复场景:

\n\n
\n
    \n
  1. 处理所有副本的不同方法如何导致数据丢失\n \n
      \n
    1. 通过等待领导者上线来处理
      \n 所有副本关闭和领导者上线之间发送的消息都会丢失。
    2. \n
    3. 一旦新的代理上线,就通过选举它作为领导者来处理
      \n 如果新的代理与前一个领导者不同步,则在该代理关闭的时间和当选为新的领导者之间\n 写入的所有数据都将被\n丢失的。当其他代理恢复时,他们将发现自己提交了新领导者上不存在的\n消息,并删除这些消息。
    4. \n
  2. \n
  3. 当领导者宕机而其他副本可能正常运行时,如何会发生数据丢失
    \n 在这种情况下,Kafka 控制器将检测领导者的丢失,并从同步副本池中选举一个新的领导者。这可能需要几秒钟的时间,并导致客户端出现 LeaderNotAvailable 错误。但是,只要生产者和消费者处理这种可能性并适当重试,就不会发生数据丢失。

  4. \n
  5. 当消费者可能错过消费消息时
    \n 如果 Kafka 配置为保留消息一天,而消费者宕机时间超过一天,消费者就会丢失消息。

  6. \n
  7. 评估消费者一致性的不同方法

    \n\n
      \n
    1. 当消费者配置为最多接收每条消息一次时,可能不会处理消息
    2. \n
    3. 当消费者配置为至少接收每条消息一次时,消息可能会被复制/处理两次
    4. \n
    5. 如果消费者配置为仅接收每条消息一次,则不会多次处理消息或未处理消息。
    6. \n
  8. \n
  9. 只要您向一个分区进行生产并从一个分区进行消费,Kafka 就会提供以下保证。如果您使用两个使用者从同一分区读取数据或使用两个生产者写入同一分区,则所有保证都将关闭。\n Kafka对数据的一致性和可用性做出以下保证:

    \n\n
      \n
    1. 发送到主题分区的消息将按照发送顺序附加到提交日志中,
    2. \n
    3. 单个消费者实例将按照消息在日志中出现的顺序看到消息,
    4. \n
    5. 当所有同步副本已将消息应用到其日志时,消息为 \xe2\x80\x98comfilled\xe2\x80\x99,并且
    6. \n
    7. 只要同步副本中至少有一个处于活动状态,任何已提交的消息都不会丢失。
    8. \n
  10. \n
\n
\n\n


\n\n

我想出的方法:

\n\n

读了几篇文章后,我觉得我应该做到以下几点:

\n\n
\n
    \n
  1. 如果消息未排队,生产者应重新发送
    \n 为此生产者应侦听发送的每条消息的确认。如果没有收到确认,它可以重试发送消息

  2. \n
  3. 生产者应该与回调异步:
    \n 如上一个示例中所述

  4. \n
  5. 如何在生产者重试发送的情况下避免重复
    \n 要避免队列中的重复,请enable.idempotence=true在生产者配置中进行设置。这将使生产者确保发送每条消息的一份副本。这需要在生产者上设置以下属性:

    \n\n
      \n
    • max.in.flight.requests.per.connection<=5
    • \n
    • retries>0
    • \n
    • acks=all(当所有broker都提交消息后获取ack)
    • \n
  6. \n
  7. 生产者应该是事务性的
    \n 正如这里所解释的。

    \n\n
      \n
    • 将事务 ID 设置为唯一 ID:

      \n\n
       producerProps.put("transactional.id", "prod-1"); \n
      Run Code Online (Sandbox Code Playgroud)\n\n

      因为我们已经启用了idempotence,Kafka 将使用此事务 ID 作为其算法的一部分来删除此生产者发送的任何消息的重复数据,从而确保幂等性。

    • \n
    • 使用事务语义:init、begin、commit、close
      \n 正如这里所解释的:

      \n\n
      producer.initTransactions(); \ntry { \n    producer.beginTransaction(); \n    producer.send(record1); \n    producer.send(record2); \n    producer.commitTransaction(); \n} catch(ProducerFencedException e) { \n    producer.close(); \n} catch(KafkaException e) { \n    producer.abortTransaction(); \n} \n
      Run Code Online (Sandbox Code Playgroud)
    • \n
  8. \n
  9. 消费者应该是交易型的

    \n\n
      consumerProps.put("isolation.level", "read_committed"); \n
    Run Code Online (Sandbox Code Playgroud)\n\n

    这确保消费者在事务完成之前不会读取任何事务消息。

  10. \n
  11. 手动提交消费者中的偏移量
    \n 如此处所述

    \n\n
      \n
    • 以原子方式处理记录并保存偏移量
      \n 可以说以原子方式将记录处理输出和偏移量保存到任何数据库。为此,我们需要将数据库连接的自动提交设置为 false,并在持久化处理输出和偏移量后手动提交。这也需要设置enable.auto.commitfalse.

    • \n
    • 从数据库读取初始偏移量(例如从缓存恢复后读取)
      \n 查找消费者到此偏移量,然后从该位置读取。

    • \n
  12. \n
\n
\n\n


\n\n

我有疑问:

\n\n

(有些疑问可能是主要的,可以通过实现代码来解决。但我想听听经验丰富的 kafka 开发人员的话。)

\n\n
    \n
  1. 消费者是否只需要从数据库中读取偏移量以用于初始(/消费者恢复后的第一个)读取或所有读取?我觉得它只需要在重新启动时从数据库读取偏移量,如此处所述

  2. \n
  3. 我们必须选择手动分区吗?这种方法是否仅在自动分区关闭的情况下有效?我有这个疑问,因为这个例子解释了通过显式指定分区在 MySQL 中存储偏移量。

  4. \n
  5. 我们是否需要两者:生产者端 kafka 事务和消费者端数据库事务(用于自动存储偏移量和处理记录)?我觉得对于生产者幂等性,我们需要生产者有唯一的事务ID,为此我们需要使用kafka事务API(init、begin、commit)。而作为对应方,消费者也需要设置isolation.levelread_committed。但是,在不使用kafka事务的情况下,我们能保证不丢失消息、不重复处理吗?或者它们是绝对必要的?

  6. \n
  7. 我们是否应该按照上面和此处的说明将偏移量保留到外部数据库,或者按照此处的说明将偏移量发送到事务 (我也没有明白将偏移量发送到事务到底意味着什么)\n
    遵循此处解释的同步异步提交组合。

  8. \n
  9. 我认为消息丢失/重复场景 1 和 2 是通过我上面解释的方法的第 1 点到第 4 点来处理的。

  10. \n
  11. 我觉得消息丢失/重复场景 3 是由我上面解释的方法的第 6 点处理的。

  12. \n
  13. 我们如何实现消息丢失/重复场景 4 中所述的不同消费者一致性方法?它们是否有任何配置或者需要在消费者内部的自定义逻辑中实现?

  14. \n
  15. 消息丢失/重复场景 5 说:“只要您向一个分区生产并从一个分区消费,Kafka 就会提供以下保证。”?在构建正确的系统时需要关注什么吗?

  16. \n
  17. 我上面提出的方法中是否有任何考虑是不必要/多余的?我还遗漏了任何必要的考虑吗?我是否错过了任何消息丢失/重复的情况?

  18. \n
  19. 他们是否有任何其他标准/推荐/更好的方法来确保没有消息丢失和重复处理比我上面想到的?

  20. \n
  21. 我是否必须使用 kafka API 实际编写上述方法?或者是否有任何构建在 kafka API 之上的高级 API 可以轻松确保没有消息丢失和重复处理?

  22. \n
  23. 看看我们面临的问题(如一开始所述),我们在考虑是否可以从 kafka 存储消息的文件中恢复任何丢失/未处理的消息。但这是不正确的,对吧?

  24. \n
\n\n

(对于如此详尽的帖子感到非常抱歉,但我想写一个问题,将在一个地方提出所有相关问题,从而构建如何围绕 kafka 构建系统的大局。)

\n