小编Rad*_*tor的帖子

如何选择一个Kafka transaction.id

我想知道我能否在理解Kafka中的交易方面获得帮助,尤其是如何使用transaction.id。这里是上下文:

  1. 我的Kafka应用程序遵循以下模式:使用来自输入主题的消息,进行处理,然后发布到输出主题。
  2. 我使用的不是使用Kafka Streams API。
  3. 我在一个消费者组中有多个消费者,每个消费者都在自己的轮询线程中。
  4. 有一个带有工作线程的线程池,这些线程用于执行消息处理并将其发布到输出主题。目前,每个线程都有自己的生产者实例。
  5. 我正在使用已发布的事务API,以确保消耗偏移量的更新和对输出主题的发布是原子发生的

到目前为止,我的假设包括:

  1. 如果我的进程在中间事务中崩溃,那么该事务中的任何内容都不会发布,也不会消耗消耗。因此,重新启动后,我只需从原始的消耗偏移量再次启动事务即可。
  2. 对于生产者transaction.id,最重要的是它是唯一的。因此,我可以在启动时生成基于时间戳的ID

然后,我阅读了以下博客:https : //www.confluent.io/blog/transactions-apache-kafka/。特别是在“如何选择交易ID”部分中,这似乎意味着我需要保证每个输入分区都有一个生产者实例。它说:“正确抵御僵尸的关键是确保对于给定的transactional.id,读-写-写循环中的输入主题和分区始终相同。” 它还进一步列举了以下问题示例:“例如,在分布式流处理应用程序中,假设主题分区tp0最初是由transactional.id T0处理的。如果在以后的某个时候,它可以通过事务处理映射到另一个生产者.id T1,在T0和T1之间不会有隔离。因此,有可能重新处理来自tp0的消息,这完全违反了一次处理保证。”

我不太明白为什么会这样。在我看来,只要事务是原子的,我就不必关心哪个生产者会处理来自任何分区的消息。我已经为此努力了一天,我想知道是否有人可以告诉我我在这里错过的事情。因此,为什么不能将工作分配给具有任何transaction.id设置的任何生产者实例,只要它是唯一的。以及为什么他们说如果您这样做,则消息可能会通过事务提供的隔离机制泄漏。

java apache-kafka

8
推荐指数
3
解决办法
3454
查看次数

标签 统计

apache-kafka ×1

java ×1