了解kafka生产者的max.inflight属性

Que*_*eff 8 configuration producer apache-kafka kafka-producer-api

我在1.0.0-cp1版本的Kafka集群的工作台上工作.

在我的工作室的一部分,他们专注于订购保证和没有数据丢失的最大吞吐量(只有一个分区的主题),需要我将max.in.flight.requests.per.connection属性设置为1

我已经读过这篇文章

了解我只需要将max.in.flight设置为1,如果我使用该retries属性启用我的制作人的重试功能.

要问我的问题的另一种方法:只有一个分区+重试= 0(生产者道具),足以保证在卡夫卡的顺序?

我需要知道,因为增加max.in.flight会大大增加吞吐量.

Mic*_*son 12

您的用例有点不清楚.您提到了排序,没有数据丢失,但没有指定您是否容忍重复的消息.如果你想要至少一次(QoS 1)或完全一次,那就不洁净了

无论哪种方式,当您使用1.0.0并且仅使用单个分区时,您应该查看Idempotent Producer而不是调整Producer配置.它允许正确有效地保证订购并且不会丢失数据.

从文档:

幂等传递可确保在单个生产者的生命周期内将消息一次性传递到特定主题分区.

早期的幂等生产者强迫max.in.flight.requests.per.connection为1(出于你提到的相同原因),但在最新版本中,它现在可以使用max.in.flight.requests.per.connection设置为最多5并仍然保持其保证.

使用Idempotent Producer,您不仅可以获得更强的传递语义(完全一次而不是至少一次),但它甚至可能表现更好!

我建议您检查文档中的交付语义

回到你的问题

是没有幂等(或事务)生成器,如果要避免数据丢失(QoS 1)并保留排序,则必须设置max.in.flight.requests.per.connection为1,允许retries和使用acks=all.如您所见,这会带来显着的性能成本.

  • 如果您想保留消息顺序,我不确定您是否可以将max.in.flight.requests.per.connection设置为5.如果一条消息被拒绝并需要重试,但在中间时间内已分派第二条消息并将其写入主题,则第一条消息将在第二条消息之后写入主题.要避免这种情况,您只能在飞行中有一条消息.据我所知,如果您只想要一次性交付,可以将此属性设置为5 (3认同)
  • @PabloAntequera:我不知道以前的版本,但对于最新版本的 Kafka,enable.idempotence=true 可以防止重复并保持消息的顺序,即使 max.in.flight.requests.per.connection > 1 也是如此。解释在这里:https://docs.confluence.io/cloud/current/client-apps/optimizing/durability.html#duplication-and-ordering (2认同)

Dav*_*ora 6

是的,您必须将该max.in.flight.requests.per.connection属性设置为1。在您读过的文章中,这是一个最初的错误(目前已更正),作者写道:

每个会话的最大飞行请求数

Kafka 文档中不存在这一点。

这个勘误表可能来自《Kafka The Definitive Guide》(第一版)一书,您可以在第 52 页阅读:

<...因此,如果保证顺序至关重要,我们建议进行设置 in.flight.requests.per.session=1以确保在重试一批消息时,不会发送其他消息...>


Eug*_*ene 6

在我看来,了解这个问题是非常宝贵的,它使事情变得更加有趣并且稍微复杂一些。

\n

当您启用 时enable.idempotence=true,每次向代理发送消息时,您也会发送一个sequence number从零开始的 。经纪人也将该序列号存储在他们这边。当您向代理发出下一个sequence_id=3请求时,让\xe2\x80\x99s 说 with ,代理可以查看其当前存储的序列号并说:

\n
    \n
  • 如果是 4 - 好,则是一批新记录
  • \n
  • 如果是 3 - 它是重复的
  • \n
  • 如果是 5(或更高),则意味着消息丢失
  • \n
\n

现在max.inflight.requests.per.connection。生产者可以发送与此值一样多的并发请求,而无需实际等待代理的答复。当我们达到 3 时(让\xe2\x80\x99s 说max.inflight.requests.per.connection=3),我们开始向代理询问之前的结果(同时我们现在可以\xe2\x80\x99t 处理任何批次,即使它们已经准备好)。

\n

现在,为了举例,让\xe2\x80\x99s 说经纪人这样说:\xe2\x80\x9c1 没问题,我存储了它\xe2\x80\x9d,\xe2\x80\x9c2 失败了\ xe2\x80\x9d 现在是重要的部分:因为 2 失败了,所以 3 唯一可能得到的是 \xe2\x80\x9cout of order\xe2\x80\x9d,这意味着它没有存储它。客户端现在知道它需要重新处理2,并且3它创建一个列表并按照确切的顺序重新发送它们;如果启用重试。

\n

这个解释可能过于简化,但这是我阅读了一些源代码后的基本理解。

\n