如何在面向请求的应用程序中管理 Kafka 事务生产者对象

Raf*_*ira 5 transactions apache-kafka

当配置为事务性生产者时,在面向请求(例如http或RPC服务器)的应用程序中管理Kafka生产者对象的最佳实践是什么?具体来说,如何在服务线程之间共享生产者对象,以及如何定义transactional.id这些对象的配置值?

在非事务性使用中,生产者对象是线程安全的,并且在所有请求服务线程之间共享一个对象是很常见的。设置供 kafka 消费者线程使用的事务性生产者对象也很简单,只需为每个消费者线程实例化一个对象即可。

将事务生产者与面向请求的应用程序相结合似乎更加复杂,因为服务线程的生命周期通常由线程池动态控制。我可以想到一些选择,但都有缺点:

  1. 共享单个对象,通过某种互斥体防止并发。负载下的争用可能会是一个严重的问题。
  2. 为每个传入的请求实例化一个生产者对象。KafkaProducer 对象初始化很慢,因为它们维护网络连接、线程和其他重量级对象;为每个请求支付这笔费用似乎不切实际。
  3. 维护一个生产者对象池,并为每个请求租用一个。我看到的主要缺点是所需的机械数量。目前还不清楚如何配置transactional.id这些对象,因为它们的生命周期并没有像文档所述那样清晰地映射到分区、有状态应用程序中的分片标识符

还有其他选择吗?有没有一个最佳的方法?

gab*_*ssi 3

长话短说

\n

事务ID是为了防止僵尸进程造成的重复read-process-write读取和生成 kafka 主题的模式中由僵尸进程引起的重复。对于面向请求的应用程序,例如由传入的 http 请求生成的消息,事务 ID 不会带来任何好处(当然,如果您想使用事务并且不应该在生产者之间重复,您仍然需要分配一个 ID)。集群中的相同进程或不同进程)

\n

长答案

\n

正如文档所说,事务生产者不是线程安全的

\n
\n

正如示例中所暗示的,每个生产者只能有一个开放交易。beginTransaction() 和 commitTransaction() 调用之间发送的所有消息都将成为单个事务的一部分

\n
\n

因此,正如您正确解释的那样,无法并发访问生产者,因此我们必须选择您描述的三个选项之一。

\n

对于这个答案,我假设request oriented applications于http请求,因为该机制正在触发通过事务生成的消息(实际上,不止一条消息,否则对于幂等生产者来说就足够了,并且不需要事务) )

\n

就正确性而言,所有这些都可以,因为选项 1 可以工作,但根据您的应用程序吞吐量,它可能会出现高争用,选项 2 也可以工作,但您将付出更高延迟的代价,并且不会很严重\n恕我直言,我认为选项 3 可能是最好的,因为它是前两个选项之间的折衷方案,尽管当然需要更仔细的实施,而不仅仅是每次打开一个新的生产者。

\n

交易ID

\n

剩下的问题是如何将事务 id 分配给生产者,特别是在最后一种情况下(尽管选项 1 和 3 都有相同的关注点,因为在这两种情况下,我们都重复使用具有相同事务 id 的生产者来处理不同的请求)。

\n

要回答这个问题,我们首先需要了解 transactional.id 的目标是保护我们免受僵尸进程(挂起一段时间的进程,例如 bc 长时间 gc 暂停,并被认为是死进程)造成的重复消息的影响但过了一会儿又回来并继续),这称为僵尸击剑。

\n

了解僵尸防护需求的一个重要细节是了解它可能在哪种用例中发生,这是read-process-write从主题读取、处理元素并写入输出主题和偏移主题的模式,这为我们提供了原子性和恰好一次语义(如果您没有对流程步骤产生任何副作用)。\n幂等生产者可防止我们因生产者重试而导致重复(其中消息由代理保留,但确认未由代理接收)生产者)和 kafka 中的两阶段提交(我们不仅写入输出,还通过生成偏移量主题将消息标记为已消费)可以防止我们因多次消费消息而导致重复(如果进程在生成到输出主题之后但在提交偏移之前崩溃)。\n仍然存在一种微妙的情况,即可以引入重复项并且它是僵尸生产者,通过每次生产者调用initTransactions时单调增加一个纪元来隔离该生产者将与生产者发送的每条消息一起发送。\n因此,对于要隔离的生产者,应该使用相同的事务 id启动另一个生产者,这里的关键由 Jason Gustafson 解释本次演讲中解释

\n
\n

“我们正在寻找的是保证对于每个输入分区,只有一个写入负责读取该数据并写入输出”

\n
\n

这意味着 transactional.id 是根据分区以“读-进程-写”模式消耗而分配的。\n因此,如果已分配主题 A 的分区 0 的进程被认为已死亡,则会启动重新平衡,并且分配的新进程应该创建一个具有相同 transactional.id 的生产者,这就是为什么它应该像这个<prefix><group>.<topic>.<partition>答案中所描述的那样,其中分区是 transactional.id 的一部分。这也意味着为每个分区分配一个生产者,这也可能表示开销,具体取决于分配给消费者的主题和分区的数量。\n这张幻灯片来自演讲中的这张幻灯片阐明了这种情况

\n

进程崩溃前的事务 ID\n进程崩溃前的事务 ID

\n

崩溃后事务 ID 重新分配给其他进程\n在此输入图像描述

\n

http 请求中的事务 ID

\n

回到你原来的问题,http 请求不会遵循read-process-write僵尸可以引入重复的模式,因为每个http请求都是唯一的,即使你引入了一个唯一的标识符,从僵尸的角度来看它也将是不同的消息\n在这种情况下,我认为,如果您希望写入两个不同主题的原子性,那么使用事务生产者可能仍然有价值,但您可以为选项 2 选择随机事务 ID,或者将其重用于选项 1和 3.

\n

更新

\n

我的答案已经过时了,因为它基于旧版本的 kafka。 \n前面描述的每个分区有一个生产者的开销是一个问题,已在KIP-447中解决的一个问题

\n
\n

随着输入分区数量的增加,这种架构不能很好地扩展。每个生产者都有单独的内存缓冲区、单独的线程、单独的网络连接。这限制了生产者的性能,因为我们无法有效地使用多个任务的输出来改进批处理。它还会给代理带来不必要的负载,因为有更多的并发事务和更多的冗余元数据管理。

\n
\n

这是这篇文章中解释的主要区别

\n
\n

当消费者组重新平衡后完成分区分配时,消费者的第一步是始终获取下一个偏移量以开始获取数据。通过这一观察,OffsetFetch 协议保护得到增强,这样当消费者组具有与一个分区关联的待处理事务偏移时,OffsetFetch 调用可以被阻止,直到关联事务完成。以前,\xe2\x80\x9coutdated\xe2\x80\x9d 偏移数据将被返回,并且应用程序允许立即继续。

\n
\n

有了这个新功能,我不再清楚 transactional.id 的使用。

\n

尽管目前还不清楚为什么防护需要在存在待处理事务的情况下阻止轮询,但在我看来,发送消费者组元数据应该足够了(我假设僵尸生产者将通过提交老的 Generation.id 来进行防护) group.id,generation.id 在每次重新平衡时都会受到影响)看来 transactional.id 不再发挥主要作用。例如春天文档

\n
\n

在模式 V1 中,如果启动具有相同 transactional.id 的另一个实例,则生产者将被“隔离”。Spring 通过为每个 group.id/topic/partition 使用一个 Producer 来管理它;当重新平衡发生时,新实例将使用相同的 transactional.id 并且旧生产者被隔离。
\n使用模式 V2,不需要为每个 group.id/topic/partition 都有一个生产者,因为消费者元数据与事务的偏移量一起发送,并且代理可以使用该信息来确定生产者是否受到隔离。

\n
\n