当主题有多个分区时,KTable-KTable 外键连接不会产生所有消息

Mar*_* P. 7 java apache-kafka spring-boot spring-cloud-stream apache-kafka-streams

请参阅下面的更新以显示潜在的解决方法

我们的应用程序使用 2 个主题作为 KTables,执行左连接,并输出到一个主题。在测试期间,我们发现当我们的输出主题只有 1 个分区时,这可以正常工作。当我们增加分区数量时,我们注意到生成到输出主题的消息数量减少了。

在启动应用程序之前,我们使用多个分区配置测试了这一理论。使用 1 个分区,我们可以看到 100% 的消息。使用 2,我们会看到一些消息(少于 50%)。对于 10,我们几乎看不到任何东西(少于 10%)。

因为我们离开了,所以从主题 1 消费的每条消息都应该写入我们的输出主题,但我们发现这并没有发生。消息似乎卡在从 Ktables 的外键连接创建的“中间”主题中,但没有错误消息。

任何帮助将不胜感激!

服务.java

@Bean
public BiFunction<KTable<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {

    return (topicOne, topicTwo) ->
            topicOne
                    .leftJoin(topicTwo,
                            value -> MyOtherKey.newBuilder()
                                    .setFieldA(value.getFieldA())
                                    .setFieldB(value.getFieldB())
                                    .build(),
                            this::enrich)
                    .toStream();
}
Run Code Online (Sandbox Code Playgroud)

构建.gradle

plugins {
    id 'org.springframework.boot' version '2.3.1.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
}

...

ext {
    set('springCloudVersion', "Hoxton.SR6")
}

...

implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'io.confluent:kafka-streams-avro-serde:5.5.1'
Run Code Online (Sandbox Code Playgroud)

注意:由于 spring-cloud-stream 中包含的版本中存在错误,我们排除了 org.apache.kafka 依赖项

应用程序.yml

spring:
  application:
    name: app-name
    stream:
      bindings:
        process-in-0:
          destination: topic1
          group: ${spring.application.name}
        process-in-1:
          destination: topic2
          group: ${spring.application.name}
        process-out-0:
          destination: outputTopic
      kafka:
        streams:
          binder:
            applicationId: ${spring.application.name}
            brokers: ${KAFKA_BROKERS}
            configuration:
              commit.interval.ms: 1000
              producer:
                acks: all
                retries: 20
              default:
                key:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
                value:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
            min-partition-count: 2
Run Code Online (Sandbox Code Playgroud)

测试场景:

举一个具体的例子,如果我将以下 3 条消息发布到主题 1:

{"fieldA": 1, "fieldB": 1},,{"fieldA": 1, "fieldB": 1}
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
{"fieldA": 4, "fieldB": 4},,{"fieldA": 4, "fieldB": 4}
Run Code Online (Sandbox Code Playgroud)

输出主题只会收到 2 条消息。

{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
Run Code Online (Sandbox Code Playgroud)

其他2人怎么了?似乎某些键/值对无法写入输出主题。重试这些“丢失”的消息也不起作用。

更新:

通过将主题 1 用作 KStream 而不是 KTable 并toTable()在继续执行 KTable-KTable 连接之前调用,我能够使此功能正常运行。我仍然不确定为什么我的原始解决方案不起作用,但希望这个解决方法可以对实际问题有所了解。

@Bean
public BiFunction<KStream<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {

    return (topicOne, topicTwo) ->
            topicOne
                    .map(...)
                    .toTable()
                    .leftJoin(topicTwo,
                            value -> MyOtherKey.newBuilder()
                                    .setFieldA(value.getFieldA())
                                    .setFieldB(value.getFieldB())
                                    .build(),
                            this::enrich)
                    .toStream();
}
Run Code Online (Sandbox Code Playgroud)

Mat*_*Sax 3

根据问题的描述,(左)KTable 输入主题中的数据似乎未按其键正确分区。对于单个分区主题,那么,只有一个分区,所有数据都进入这一个分区,并且连接结果是完整的。

但是,对于多分区输入主题,您需要确保数据按键分区,否则,具有相同键的两条记录可能最终位于不同的分区中,从而导致连接失败(因为连接是在每个分区上完成的) -分区基础)。

请注意,即使外键连接不要求两个输入主题共同分区,仍然要求每个输入主题本身按其键分区!

如果您使用 a,map().toTable()您基本上会触发数据的内部重新分区,以确保数据按键分区,这可以解决问题。

  • 您说得对,我们的问题与密钥分区有关。我们稍微改变了架构,所以现在我们加入了来自生产者的 KTable,它们都使用 kafka-streams-binder。看来我们之前的架构是有问题的,因为 kafka-binder 和 kafka-streams-binder 的默认分区策略必须略有不同。我将选择您的帖子作为答案,因为它提供了对我们问题的根本原因的一些见解。谢谢@MatthiasJ.Sax (2认同)