可靠的即发即忘 Kafka 生产者实施策略

Yur*_*uri 5 messaging message-queue reliability producer apache-kafka

我正面临卡夫卡的第一英里问题。每个人都会处理分区等问题,但如何处理第一英里呢?

我的系统由许多应用程序组成,这些应用程序生成分布在节点上的事件。我需要以可靠/故障安全的方式将这些事件传递给一组充当消费者的应用程序。选择的消息系统是 Kafka(由于其日志性质),但它并不是一成不变的。

事件应该尽可能以解耦的即发即忘的方式传播。这意味着生产者应该对可靠地传递消息负全部责任。这意味着生成事件的应用程序根本不必担心事件传递。

生产者的可靠性模式必须考虑:

  • 盒子连接中断- 在中断期间生产者根本无法访问网络;Kafka集群因此无法访问
  • 框重新启动- 生产者和事件生成应用程序重新启动(独立);生产者应该保留运行中的消息(在重试、批处理等期间)
  • 内部 Kafka 异常-消息大小太大;序列化异常;ETC。

到目前为止,我检查过的库都没有涵盖这些情况。有建议的策略来解决这个问题吗?

我知道Producer 期间存在可重试不可重试的错误send()。对于那些可重试的内容,库通常会在内部处理所有事情。但是,不可重试以异步回调中的异常结束......

我应该盲目地无限重播这些吗?对于网络中断,它应该可以工作,但是 Kafka 内部错误怎么样 - 比如说消息太大。可能有类似DeadLetterQueue的机制+重播。但是,如何处理消息计数......

关于持久性 - 轻量级数据库后端应该可以解决这个问题。只需创建一个持久队列,然后删除那些已经发送/确认的队列。然而,恐怕如果这么简单的话,它很早以前就已经在标准 Kafka 库中实现了。性能可能会下降。

看到像KAFKA-3686KAFKA-1955这样的东西让我有点担心。

提前致谢。

Ami*_*mar 3

我们有一个生产系统,其主要用例是可靠的消息传递。我无法详细介绍,但我可以分享我们如何实现这一目标的高级设计。然而,该系统保证“至少一次传递”消息传递语义。

在此输入图像描述

来源

  • 首先我们设计了一个消息模式,所有发送到这个系统的消息都必须遵循它。
  • 然后我们将消息写入mysql消息表,该表按日期分片,其中一个字段标记为已发送或未发送
  • 我们有一个应用程序不断轮询数据库,其中行标记为未传递,拾取一行,构造消息并将其发送到负载均衡器,这是一个阻塞调用,并更新消息行以传递,仅当返回 200 时5xx 后,应用程序将在睡眠恢复后重试该消息。您还可以根据需要配置重试次数。

每个源系统都维护自己的轮询应用程序和数据库。

生产者阵列

  • 这基本上是负载均衡器下的一组机器等待传入消息并将这些消息生成到 Kafka 集群。
  • 我们为每个主题维护 3 个副本,并在生产者配置中保留 acks = -1 ,这对于您的即发即忘的要求非常重要。根据文档

acks=all 这意味着领导者将等待完整的同步副本集确认记录。这保证了只要至少一个同步副本保持活动状态,记录就不会丢失。这是最强有力的保证。这相当于 acks=-1 设置

  • 正如我所说,生成是一个阻塞调用,如果在所有 3 个副本中成功生成消息,它将返回 2xx。4xx,如果消息不满足架构要求 5xx,如果 kafka 代理抛出一些异常。

消费者数组

  • 这是一个正常的机器阵列,为主题的消费者组运行 Kafka 高级消费者。

目前,我们正在运行此设置,并使用很少的附加组件来实现生产中的其他一些功能流程,从源代码的角度来看,它基本上是一劳永逸的。

该系统解决了您的所有问题。

  1. 盒子连接中断:除非源轮询应用程序获得 2xx,否则它将再次产生,这可能会导致重复。

  2. 盒子重新启动:由于源的重试机制,这也应该不是问题。

  3. 内部 Kafka 异常:由轮询应用程序处理,因为生产者数组将回复 5xx 无法生产,并将进一步重试。

Acks = -1,还确保所有副本都同步并拥有消息的副本,因此代理宕机也不会成为问题。