Kafka Connect将同一任务分配给多个工作人员

Tim*_*mur 10 apache-kafka apache-kafka-connect

我在分布式模式下使用Kafka Connect。我现在多次观察到的一个奇怪行为是,经过一段时间(可能是几个小时,可能是几天),似乎出现了平衡错误:将相同的任务分配给多个工作人员。结果,它们并发运行,并且取决于连接器的性质,它们会失败或产生“不可预测的”输出。

我能够用来重现此行为的最简单的配置是:两个Kafka Connect工作程序,两个连接器,每个连接器仅执行一项任务。Kafka Connect已部署到Kubernetes中。Kafka本身位于Confluent Cloud中。Kafka Connect和Kafka的版本相同(5.3.1)。

日志中的相关消息:

工人A:

[2019-10-30 12:44:23,925] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Successfully joined group with generation 488 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:469)
[2019-10-30 12:44:23,926] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Joined group at generation 488 and got assignment: Assignment{error=0, leader='connect-1-d5c19893-b33c-4f07-85fb-db9736795759', leaderUrl='http://10.16.0.15:8083/', offset=250, connectorIds=[some-hdfs-sink, some-mqtt-source], taskIds=[some-hdfs-sink-0, some-mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
Run Code Online (Sandbox Code Playgroud)

工人B:

[2019-10-30 12:44:23,930] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Successfully joined group with generation 488 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:469)
[2019-10-30 12:44:23,936] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Joined group at generation 488 and got assignment: Assignment{error=0, leader='connect-1-d5c19893-b33c-4f07-85fb-db9736795759', leaderUrl='http://10.16.0.15:8083/', offset=250, connectorIds=[some-mqtt-source], taskIds=[some-mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
Run Code Online (Sandbox Code Playgroud)

在上面的日志摘录中,您可以观察到相同的任务(some-mqtt-source-0)被分配给了两个工作人员。收到此消息之后,我还可以在两个工作线程上看到任务实例的日志消息。

此行为不取决于连接器(我在其他任务中也观察到了)。它也不会在工作人员启动后立即发生,而只是在一段时间后才发生。

我的问题是造成这种现象的原因是什么?

编辑1:我尝试运行3个工人,而不是两个,认为这可能是一个分布式共识问题。似乎不是,有3个工作人员无法解决问题。

编辑2:我注意到,在为工作人员A分配了最初在工作人员B上运行的任务之前,该工作人员(B)观察到加入组时出错。例如,如果任务在N代中被“复制”,则工作程序B在日志中将没有“已成功加入N代的组”消息。更重要的是,在第N-1和N + 1代之间,工作程序B通常记录类似Attempt to heartbeat failed for since member id和的错误Group coordinator bx-xxx-xxxxx.europe-west1.gcp.confluent.cloud:9092 (id: 1234567890 rack: null) is unavailable or invalid。工人B通常在N代之后不久加入N + 1代(有时仅在3秒钟后加入)。现在很清楚是什么触发了该行为。然而:

  • 尽管我了解到可能存在此类临时性问题,并且在一般情况下它们可能很正常,但是为什么所有服务器成功加入下一代服务器重新平衡不能解决该问题?尽管会进行更多的平衡-它不能正确分配任务,并永久保留“重复项”(直到我重新启动工作程序为止)。

  • 似乎在某些时期,重新平衡几乎每几小时发生一次,而在其他时期,它每5分钟(精确到几秒钟)发生一次;可能是什么原因?这是正常的吗?

  • 考虑到我使用的是Confluent Cloud,“组协调器不可用或无效”错误的原因可能是什么?在Kafka Connect中是否可以调整任何配置参数,以使其针对此错误更具弹性?我知道有session.timeout.msheartbeat.interval.ms,但是文档如此简单,甚至不清楚将这些参数更改为较小或较大的值会有什么实际影响。

编辑3:我观察到该问题对于接收器任务并不关键:尽管将相同的接收器任务分配给了多个工作程序,但将相应的使用者分配给了他们通常应该分配给的不同分区,并且几乎一切都可以正常工作-我只是得到了更多任务比我最初要求的要多。但是,在源任务的情况下,行为将中断 -任务同时运行并在源端争夺资源。

编辑4:同时,我将Kafka Connect降级为2.2版(Confluent Platform 5.2.3)-之前的“增量合作平衡”版本。最近2天都可以正常使用。因此,我认为该行为与新的重新平衡机制有关。

Den*_*din 2

正如评论中提到的,Jira Kafka-9184就是为了解决这个问题而制作的,并且已经解决了。

该修复程序在 2.3.2 及更高版本中可用。

因此,现在的答案是:升级到最新版本应该可以防止此问题的发生。