小编tom*_*l01的帖子

将 Kafka Stream 从 2.5.1 升级到 2.6.2 后出现“java.lang.IllegalStateException:尝试查找未知任务 3_0 的滞后”

我刚刚将 Kafka Stream 应用程序从 2.5.1 升级到 2.6.2。以前可以用,现在不行了。

这是麻烦的拓扑(我省略了不相关的 Serdes):

val builder = new StreamsBuilder()

val contractEventStream: KStream[TariffId, ContractEvent] =
   builder.stream[String, ContractUpsertAvro](settings.contractsTopicName)
     .flatMap { (_, contractAvro) =>
       ContractEvent.from(contractAvro)
         .map(contractEvent => (contractEvent.tariffId, contractEvent))
      }

    val tariffsTable: KTable[TariffId, Tariff] =
      builder.stream[String, TariffUpdateEventAvro](settings.tariffTopicName)
        .flatMapValues(Tariff.fromAvro(_))
        .selectKey((_, tariff) => tariff.tariffId)
        .toTable(Materialized.`with`(tariffIdSerde, tariffSerde)) // Materialized.as also throws the same IllegalStateExceptions

    contractEventStream
      .join(tariffsTable)(JourneyStep.from(_, _).asInstanceOf[ContractCreated])(Joined.`with`(tariffIdSerde, contractEventSerde, tariffSerde))
      .selectKey((_, contractUpdated) => contractUpdated.accountId)
      .foreach((_, journeyStep) => println(journeyStep))
Run Code Online (Sandbox Code Playgroud)

连接给出以下异常:

java.lang.IllegalStateException: Tried to lookup lag for unknown task 3_0
    at org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:306)
    at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511) …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

7
推荐指数
1
解决办法
3854
查看次数

标签 统计

apache-kafka ×1

apache-kafka-streams ×1