如何选择一个Kafka transaction.id

Rad*_*tor 8 java apache-kafka

我想知道我能否在理解Kafka中的交易方面获得帮助,尤其是如何使用transaction.id。这里是上下文:

  1. 我的Kafka应用程序遵循以下模式:使用来自输入主题的消息,进行处理,然后发布到输出主题。
  2. 我使用的不是使用Kafka Streams API。
  3. 我在一个消费者组中有多个消费者,每个消费者都在自己的轮询线程中。
  4. 有一个带有工作线程的线程池,这些线程用于执行消息处理并将其发布到输出主题。目前,每个线程都有自己的生产者实例。
  5. 我正在使用已发布的事务API,以确保消耗偏移量的更新和对输出主题的发布是原子发生的

到目前为止,我的假设包括:

  1. 如果我的进程在中间事务中崩溃,那么该事务中的任何内容都不会发布,也不会消耗消耗。因此,重新启动后,我只需从原始的消耗偏移量再次启动事务即可。
  2. 对于生产者transaction.id,最重要的是它是唯一的。因此,我可以在启动时生成基于时间戳的ID

然后,我阅读了以下博客:https : //www.confluent.io/blog/transactions-apache-kafka/。特别是在“如何选择交易ID”部分中,这似乎意味着我需要保证每个输入分区都有一个生产者实例。它说:“正确抵御僵尸的关键是确保对于给定的transactional.id,读-写-写循环中的输入主题和分区始终相同。” 它还进一步列举了以下问题示例:“例如,在分布式流处理应用程序中,假设主题分区tp0最初是由transactional.id T0处理的。如果在以后的某个时候,它可以通过事务处理映射到另一个生产者.id T1,在T0和T1之间不会有隔离。因此,有可能重新处理来自tp0的消息,这完全违反了一次处理保证。”

我不太明白为什么会这样。在我看来,只要事务是原子的,我就不必关心哪个生产者会处理来自任何分区的消息。我已经为此努力了一天,我想知道是否有人可以告诉我我在这里错过的事情。因此,为什么不能将工作分配给具有任何transaction.id设置的任何生产者实例,只要它是唯一的。以及为什么他们说如果您这样做,则消息可能会通过事务提供的隔离机制泄漏。

Emi*_*nov 12

考虑消费者群体不断变化的情况(新消费者上线或下线)或失败场景导致消费者群体内主题分区分配的重新平衡。

现在假设一个消费者C0之前已经被分配了 partition P0。这个消费者很高兴地开始处理消息、发布新消息等。(标准的消费-转换-发布模式。)发生重新平衡事件,导致P0被毫不客气地(总是想使用这个词)从 撤销C0并分配给C1。从 的角度来看C0,它可能仍然有大量消息需要处理,并且它没有注意到重新分配。你的情况最终其中两个C0C1的时间很短的时间可能会认为他们都“拥有” P0,并采取相应的行动,即将离任的主题创建重复的邮件,更糟糕的,有这些重复出现的潜在的顺序进行。

的使用transactional.id启用了原始博客所指的“围栏” 。作为重新分配的一部分,新生产者将在增加的纪元编号下行动,而现有生产者仍将使用旧纪元。击剑是微不足道的;丢弃时代已经过去的消息。

Kafka 交易有一些问题:

  • 入站和出站主题必须位于同一个集群上,事务才能正常工作。
  • 的命名transactional.id对于生产者的“移交”至关重要,即使您不关心僵尸围栏。新生产者的出现将促使为失效的生产者整理任何孤立的正在进行的交易,因此要求 ID 在生产者会话中保持稳定/可重复。不要为此使用随机 ID;这不仅会导致不完整的交易(在READ_COMMITTED模式中阻塞每个消费者),而且还会在交易协调器(在代理上运行)上积累额外的状态。默认情况下,此状态将持续 7 天,因此您不想一时兴起产生任意命名的事务生产者。
  • 理想情况下transactional.id反映了入站主题分区的组合。(当然,除非您有一个单分区主题。)实际上,这意味着为分配给消费者的每个分区创建一个新的事务性生产者。(记住,在consume-trasform-publish场景中,生产者也是消费者,消费者分区分配会随着每个重新平衡事件而变化。)看看spring-kafka的实现,它懒惰地为每个入站创建一个新的生产者划分。(关于这种方法的安全性,以及是否应该在分区重新分配时清理生产者,有一些话要说,但那是另一回事。)
  • 防护机制仅在 Kafka 级别运行。换句话说,它将失效的生产者与 Kafka 隔离,而不是与世界其他地方隔离。这意味着,如果您的生产者还必须更新某些外部状态(在数据库、缓存等中)作为消费-转换-发布周期的一部分,则应用程序有责任在分区重新分配时将自身与数据库隔离,或以其他方式确保更新的幂等性。

为了完整起见,值得指出的是,这不是实现围栏的唯一方法。Kafka 消费者 API 确实为用户提供了注册 a 的能力ConsumerRebalanceListener,这为被取代的消费者提供了在将分区重新分配给新消费者之前排空任何未完成的积压(或摆脱它)的最后机会。回调被阻塞;当它返回时,假定处理程序已在本地将自己隔离起来;然后,只有这样,新的消费者才会恢复处理。

  • >`关于这种方法的安全性,以及是否应该在分区重新分配时清理生产者,还有一些话要说,但那是另一回事了。`。当分区被撤销时,分配的生产者将关闭。` ProducerFactory.closeProducerFor(zombieFenceTxIdSuffix(tp.topic(), tp.partition()));` 我们将此方案基于 Kafka Streams 的实现方式。 (2认同)

Rya*_*bol 8

您提到的博客文章虽然包含大量内容,但具有所需的所有信息。

为什么交易?上述文章中的部分。

使用配置为至少一次传递语义的香草Kafka生产者和消费者,流处理应用程序一旦以以下方式处理语义,就可能完全丢失:

  1. producer.send()由于内部重试,可能导致消息B的重复写入。这由幂等的生产者解决,而不是本文其余部分的重点。

  2. 我们可能会重新处理输入消息A,导致重复的B消息被写到输出中,这完全违反了一次处理语义。如果流处理应用程序在写入B之后但将A标记为已使用之前崩溃,则可能会发生重新处理。因此,当恢复时,它将再次消耗A并再次写入B,从而导致重复。

  3. 最后,在分布式环境中,应用程序将崩溃或(更糟!)暂时失去与系统其余部分的连接。通常,新实例会自动启动以替换被认为丢失的实例。通过此过程,我们可能有多个实例处理相同的输入主题并写入相同的输出主题,从而导致输出重复,并且违反了一次处理语义的方式。我们称这个问题“僵尸实例。” [添加重点]

来自同一篇文章的“ 事务语义”部分。

僵尸击剑

通过要求为每个事务生成器分配一个唯一的标识符称为,我们解决了僵尸实例的问题transactional.id这用于在流程重新启动时标识相同的生产者实例。[重点添加]

该API要求事务处理生产者的第一个操作应该是transactional.id在Kafka集群中显式注册它。当这样做时,Kafka经纪人会检查给定的未transactional.id完成交易并完成交易。它还会增加与关联的纪元transactional.id。时期是为每个储存的内部元数据片段transactional.id

一旦时期被颠覆,任何具有相同transactional.id和较旧时期的生产者都被认为是僵尸,被围起来。那些生产者将来的交易性写被拒绝了。[重点添加]

并且来自同一篇文章的“ 数据流”部分。

答:生产者与交易协调者的互动

在执行事务时,生产者在以下几点向事务协调器发出请求:

  1. initTransactionsAPI注册了一个transactional.id与协调。此时,协调器将关闭与此相关的所有未决事务,transactional.id并冲破纪元以掩盖僵尸。每个生产者会话仅发生一次。[重点添加]

  2. 当生产者将在事务中首次将数据发送到分区时,首先要向协调器注册该分区。

  3. 当应用程序调用commitTransaction或时abortTransaction,将向协调器发送一个请求以开始两阶段提交协议。

希望这可以帮助!


Vas*_*lis 5

使用 Streams API 时(与常规 Kafka 生产者相比),您不必担心为transactional.id流应用程序的每个实例设置唯一的值。当您启用 Streamsexactly_once语义时,Streams API 将根据主题/分区生成正确/唯一的 transactional.id。

看看这里到底做了什么:https://github.com/axbaretto/kafka/blob/fe51708ade3cdf4fe9640c205c66e3dd1a110062/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L455

Task(指代码中的TaskId)解释如下: https: //docs.confluence.io/current/streams/architecture.html#stream-partitions-and-tasks