Mar*_* P. 7 java apache-kafka spring-boot spring-cloud-stream apache-kafka-streams
请参阅下面的更新以显示潜在的解决方法
我们的应用程序使用 2 个主题作为 KTables,执行左连接,并输出到一个主题。在测试期间,我们发现当我们的输出主题只有 1 个分区时,这可以正常工作。当我们增加分区数量时,我们注意到生成到输出主题的消息数量减少了。
在启动应用程序之前,我们使用多个分区配置测试了这一理论。使用 1 个分区,我们可以看到 100% 的消息。使用 2,我们会看到一些消息(少于 50%)。对于 10,我们几乎看不到任何东西(少于 10%)。
因为我们离开了,所以从主题 1 消费的每条消息都应该写入我们的输出主题,但我们发现这并没有发生。消息似乎卡在从 Ktables 的外键连接创建的“中间”主题中,但没有错误消息。
任何帮助将不胜感激!
服务.java
@Bean
public BiFunction<KTable<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {
    return (topicOne, topicTwo) ->
            topicOne
                    .leftJoin(topicTwo,
                            value -> MyOtherKey.newBuilder()
                                    .setFieldA(value.getFieldA())
                                    .setFieldB(value.getFieldB())
                                    .build(),
                            this::enrich)
                    .toStream();
}
构建.gradle
plugins {
    id 'org.springframework.boot' version '2.3.1.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
}
...
ext {
    set('springCloudVersion', "Hoxton.SR6")
}
...
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'io.confluent:kafka-streams-avro-serde:5.5.1'
注意:由于 spring-cloud-stream 中包含的版本中存在错误,我们排除了 org.apache.kafka 依赖项
应用程序.yml
spring:
  application:
    name: app-name
    stream:
      bindings:
        process-in-0:
          destination: topic1
          group: ${spring.application.name}
        process-in-1:
          destination: topic2
          group: ${spring.application.name}
        process-out-0:
          destination: outputTopic
      kafka:
        streams:
          binder:
            applicationId: ${spring.application.name}
            brokers: ${KAFKA_BROKERS}
            configuration:
              commit.interval.ms: 1000
              producer:
                acks: all
                retries: 20
              default:
                key:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
                value:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
            min-partition-count: 2
测试场景:
举一个具体的例子,如果我将以下 3 条消息发布到主题 1:
{"fieldA": 1, "fieldB": 1},,{"fieldA": 1, "fieldB": 1}
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
{"fieldA": 4, "fieldB": 4},,{"fieldA": 4, "fieldB": 4}
输出主题只会收到 2 条消息。
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
其他2人怎么了?似乎某些键/值对无法写入输出主题。重试这些“丢失”的消息也不起作用。
更新:
通过将主题 1 用作 KStream 而不是 KTable 并toTable()在继续执行 KTable-KTable 连接之前调用,我能够使此功能正常运行。我仍然不确定为什么我的原始解决方案不起作用,但希望这个解决方法可以对实际问题有所了解。
@Bean
public BiFunction<KStream<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {
    return (topicOne, topicTwo) ->
            topicOne
                    .map(...)
                    .toTable()
                    .leftJoin(topicTwo,
                            value -> MyOtherKey.newBuilder()
                                    .setFieldA(value.getFieldA())
                                    .setFieldB(value.getFieldB())
                                    .build(),
                            this::enrich)
                    .toStream();
}
根据问题的描述,(左)KTable 输入主题中的数据似乎未按其键正确分区。对于单个分区主题,那么,只有一个分区,所有数据都进入这一个分区,并且连接结果是完整的。
但是,对于多分区输入主题,您需要确保数据按键分区,否则,具有相同键的两条记录可能最终位于不同的分区中,从而导致连接失败(因为连接是在每个分区上完成的) -分区基础)。
请注意,即使外键连接不要求两个输入主题共同分区,仍然要求每个输入主题本身按其键分区!
如果您使用 a,map().toTable()您基本上会触发数据的内部重新分区,以确保数据按键分区,这可以解决问题。
| 归档时间: | 
 | 
| 查看次数: | 970 次 | 
| 最近记录: |