Kafka Stream中幂等与正好一次的区别

San*_*eep 8 apache-kafka apache-kafka-streams

我正在查看文档,据我了解,通过启用以下功能,我们可以实现一次准确的交易 idempotence=true

幂等:幂等生产者针对单个主题为生产者启用一次。基本上,每条发送的邮件都有垃圾保证,在出现错误的情况下不会重复

那么,如果我们已经具有幂等性,那为什么我们要在Kafka Stream中一次只需要另一个属性呢?幂等与完全一次之间到底有什么不同

为什么普通Kafka Producer中不存在一次完全属性?

sun*_*007 19

在分布式环境中,故障是一种非常常见的情况,随时可能发生。在 Kafka 环境中,broker 可能会崩溃、网络故障、处理失败、消息发布失败或消息消费失败等。这些不同的场景引入了不同类型的数据丢失和重复。

故障场景

A(Ack Failed):生产者通过 retry>1 成功发布消息,但由于失败而无法接收确认。在这种情况下,生产者将重试可能引入重复的相同消息。

在此处输入图片说明

B(生产者进程在批量消息中失败):生产者发送一批消息失败,几乎没有发布成功。在这种情况下,一旦生产者重新启动,它将再次重新发布批次中的所有消息,这将在 Kafka 中引入重复。 在此处输入图片说明

C(Fire & Forget Failed)生产者发布消息,retry=0(fire and forget)。万一发布失败,将不知道并发送下一条消息,这将导致消息丢失。 在此处输入图片说明

D(Consumer failed in batch message)消费者收到来自Kafka的一批消息并手动提交他们的偏移量(enable.auto.commit=false)。如果消费者在提交给 Kafka 之前失败了,下一次消费者将再次消费相同的记录,从而在消费者端复制重复。

在此处输入图片说明

恰好一次语义

在这种情况下,即使生产者尝试重新发送消息,也会导致该消息仅被消费者发布和消费一次。

为了在 Kafka 中实现 Exactly-Once 语义,它使用以下 3 个属性

  1. enable.idempotence=true (地址 a, b & c)
  2. MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=5(生产者每个连接总是有一个正在进行的请求)
  3. isolation.level=read_committed(地址 d)

启用幂等(enable.idempotence=true)

幂等交付使生产者能够在单个生产者的生命周期内将消息准确地写入 Kafka 到主题的特定分区一次,而不会丢失数据和每个分区的顺序。

“请注意,启用幂等性要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于或等于 5,RETRIES_CONFIG 大于 0 且 ACKS_CONFIG 为 'all'。如果用户未明确设置这些值,则将选择合适的值。如果不兼容的值是设置,一个 ConfigException 将被抛出”

为了实现幂等性,Kafka 在生成消息时使用了一个唯一的 id,称为产品 id 或 PID 和序列号。生产者不断增加发布的每条消息的序列号,这些消息映射到唯一的 PID。代理总是将当前序列号与前一个序列号进行比较,如果新序列号不比前一个序列号大 +1,则它会拒绝,这避免了重复,并且如果超过更大的显示在消息中丢失,则同时拒绝 在此处输入图片说明

在失败场景中,broker 会将序列号与前一个序列号进行比较,如果序列号没有增加,+1 将拒绝该消息。 在此处输入图片说明

交易(isolation.level)

事务使我们能够以原子方式更新多个主题分区中的数据。包含在交易中的所有记录都将被成功保存,或者它们都不会被保存。它允许您在同一个事务中提交您的消费者偏移量以及您已处理的数据,从而允许端到端的恰好一次语义。

生产者不会等待向 Kafka 写入消息,而生产者使用 beginTransaction、commitTransaction 和 abortTransaction(在失败的情况下)消费者使用隔离级别 read_committed 或 read_uncommitted

  • read_committed:消费者将始终只读取提交的数据。
  • read_uncommitted:按偏移顺序读取所有消息,无需等待事务提交

如果具有隔离级别=read_committed 的消费者到达尚未完成的事务的控制消息,则在生产者提交或中止事务或发生事务超时之前,它将不会再从该分区传递任何消息。事务超时由生产者使用配置transaction.timeout.ms(默认1分钟)确定。

Exactly-Once 在生产者和消费者中

在正常情况下,我们有单独的生产者和消费者。生产者必须幂等同时管理事务,因此消费者可以使用隔离级别来只读 read_committed 使整个过程成为原子操作。这保证了生产者将始终与源系统同步。即使生产者崩溃或事务中止,它也始终是一致的,并且将一条消息或一批消息作为一个单元发布一次。

同一个消费者将接收一条消息或一批消息作为一个单元一次。

在 Exactly-Once 语义中,Producer 和 Consumer 将作为原子操作出现,并将作为一个单元运行。要么发布并完全消费一次,要么中止。

在 Kafka Stream 中恰好一次

Kafka Stream 使用来自主题 A 的消息,处理并将消息发布到主题 B,一旦发布,使用提交(提交主要是在秘密运行)将所有状态存储数据刷新到磁盘。

Kafka Stream 中的 Exactly-once 是一种读-处理-写模式,可保证此操作将被视为原子操作。由于Kafka Stream同时满足了生产者、消费者和事务的需求,因此Kafka Stream带有特殊的参数processing.guarantee,它可以是exact_once或at_least_once,这使得不必单独处理所有参数变得容易。

Kafka Streams 以原子方式更新消费者偏移量、本地状态存储、状态存储更改日志主题和生产以一起输出主题。如果这些步骤中的任何一个失败,则所有更改都会回滚。

processing.guarantee:exactly_once 自动提供以下参数,您无需明确设置

  1. isolation.level=read_committed
  2. enable.idempotence=true
  3. MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=5


hqt*_*hqt 12

Kafka流从端到端的角度提供了一次精确的语义(从一个主题进行消费,对该消息进行处理,然后生成另一个主题)。但是,您仅提及生产者的幂等属性。这只是整体的一小部分。

我再改一下这个问题:

为什么我们已经在生产者端保证了一次准确的传递语义,但为什么在消费者方需要一次精确的传递语义呢?

答:因为一次准确的传递语义不仅在生产步骤,而且在整个处理流程中。为了在语义上实现一次准确交付,必须满足一些生产和消费条件。

这是一般情况:进程A生成到主题T的消息。同时,进程B尝试使用主题T中的消息。我们要确保进程B永远不会处理一条消息两次。

生产者部分:我们必须确保生产者永远不会两次产生消息。我们可以使用Kafka等幂生产者

使用者部分: 这是使用者的基本工作流程:

  • 步骤1:消费者成功地从Kafka的主题中提取了消息M。
  • 步骤2:使用者尝试执行作业,作业成功返回。
  • 步骤3:使用者将邮件的偏移量提交给Kafka经纪人。

以上步骤只是一条快乐的路。现实中出现了许多问题。

  • 方案1:步骤2上的作业成功执行,但是使用者崩溃了。由于这种意外情况,使用者尚未提交该消息的抵消额。当使用者重新启动时,该消息将被使用两次。
  • 方案2:使用者在步骤3中提交偏移量时,由于硬件故障(例如:CPU,内存冲突等)而崩溃,重新启动时,使用者无法知道它是否已成功提交偏移量。

因为可能发生许多问题,所以作业的执行和提交偏移必须是原子的,以确保在消费者端一次准确地交付语义。这并不意味着我们不能,但是要确保一次准确的交付语义就需要付出很多努力。Kafka Stream支持工程师的工作。

注意: Kafka Stream提供“一次精确的流处理”。它是指从一个主题进行消费,在Kafka主题中实现中间状态并产生一个状态。如果我们的应用程序依赖于某些其他外部服务(数据库,服务...),则必须确保我们的外部依赖项可以保证在这种情况下仅一次。

TL,DR:要使流程完整一次,就需要生产者和消费者之间的合作。

参考文献: