将 Kafka Stream 从 2.5.1 升级到 2.6.2 后出现“java.lang.IllegalStateException:尝试查找未知任务 3_0 的滞后”

tom*_*l01 7 apache-kafka apache-kafka-streams

我刚刚将 Kafka Stream 应用程序从 2.5.1 升级到 2.6.2。以前可以用,现在不行了。

这是麻烦的拓扑(我省略了不相关的 Serdes):

val builder = new StreamsBuilder()

val contractEventStream: KStream[TariffId, ContractEvent] =
   builder.stream[String, ContractUpsertAvro](settings.contractsTopicName)
     .flatMap { (_, contractAvro) =>
       ContractEvent.from(contractAvro)
         .map(contractEvent => (contractEvent.tariffId, contractEvent))
      }

    val tariffsTable: KTable[TariffId, Tariff] =
      builder.stream[String, TariffUpdateEventAvro](settings.tariffTopicName)
        .flatMapValues(Tariff.fromAvro(_))
        .selectKey((_, tariff) => tariff.tariffId)
        .toTable(Materialized.`with`(tariffIdSerde, tariffSerde)) // Materialized.as also throws the same IllegalStateExceptions

    contractEventStream
      .join(tariffsTable)(JourneyStep.from(_, _).asInstanceOf[ContractCreated])(Joined.`with`(tariffIdSerde, contractEventSerde, tariffSerde))
      .selectKey((_, contractUpdated) => contractUpdated.accountId)
      .foreach((_, journeyStep) => println(journeyStep))
Run Code Online (Sandbox Code Playgroud)

连接给出以下异常:

java.lang.IllegalStateException: Tried to lookup lag for unknown task 3_0
    at org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:306)
    at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511)
    at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216)
    at java.util.TreeMap.compare(TreeMap.java:1295)
    at java.util.TreeMap.put(TreeMap.java:538)
    at java.util.TreeSet.add(TreeSet.java:255)
    at java.util.AbstractCollection.addAll(AbstractCollection.java:344)
    at java.util.TreeSet.addAll(TreeSet.java:312)
    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1275)
    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1189)
    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:940)
    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:399)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:684)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:597)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:560)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1160)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1135)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Run Code Online (Sandbox Code Playgroud)

我看不出我做错了什么。上面的代码适用于 Kafka 2.5.1。有人知道发生了什么事吗?

小智 14

该问题是由 Kafka Streams 缓存引起的,该缓存保留在磁盘上。此缓存特定于 Kafka 版本和您使用的 Kafka Streams 拓扑(即拓扑的更改也可能导致此错误)。

如果您将“state.dir”属性传递给 Kafka Streams,则缓存通常位于 /tmp 或其他位置。清除带有缓存的目录,您应该能够干净地重新开始。

  • 拯救了我的一天!在 MacOS 上,它驻留在“/private/tmp/kafka-streams”中 (2认同)