tom*_*l01 7 apache-kafka apache-kafka-streams
我刚刚将 Kafka Stream 应用程序从 2.5.1 升级到 2.6.2。以前可以用,现在不行了。
这是麻烦的拓扑(我省略了不相关的 Serdes):
val builder = new StreamsBuilder()
val contractEventStream: KStream[TariffId, ContractEvent] =
builder.stream[String, ContractUpsertAvro](settings.contractsTopicName)
.flatMap { (_, contractAvro) =>
ContractEvent.from(contractAvro)
.map(contractEvent => (contractEvent.tariffId, contractEvent))
}
val tariffsTable: KTable[TariffId, Tariff] =
builder.stream[String, TariffUpdateEventAvro](settings.tariffTopicName)
.flatMapValues(Tariff.fromAvro(_))
.selectKey((_, tariff) => tariff.tariffId)
.toTable(Materialized.`with`(tariffIdSerde, tariffSerde)) // Materialized.as also throws the same IllegalStateExceptions
contractEventStream
.join(tariffsTable)(JourneyStep.from(_, _).asInstanceOf[ContractCreated])(Joined.`with`(tariffIdSerde, contractEventSerde, tariffSerde))
.selectKey((_, contractUpdated) => contractUpdated.accountId)
.foreach((_, journeyStep) => println(journeyStep))
Run Code Online (Sandbox Code Playgroud)
连接给出以下异常:
java.lang.IllegalStateException: Tried to lookup lag for unknown task 3_0
at org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:306)
at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511)
at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216)
at java.util.TreeMap.compare(TreeMap.java:1295)
at java.util.TreeMap.put(TreeMap.java:538)
at java.util.TreeSet.add(TreeSet.java:255)
at java.util.AbstractCollection.addAll(AbstractCollection.java:344)
at java.util.TreeSet.addAll(TreeSet.java:312)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1275)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1189)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:940)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:399)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:684)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:597)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:560)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1160)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1135)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Run Code Online (Sandbox Code Playgroud)
我看不出我做错了什么。上面的代码适用于 Kafka 2.5.1。有人知道发生了什么事吗?
小智 14
该问题是由 Kafka Streams 缓存引起的,该缓存保留在磁盘上。此缓存特定于 Kafka 版本和您使用的 Kafka Streams 拓扑(即拓扑的更改也可能导致此错误)。
如果您将“state.dir”属性传递给 Kafka Streams,则缓存通常位于 /tmp 或其他位置。清除带有缓存的目录,您应该能够干净地重新开始。
| 归档时间: |
|
| 查看次数: |
3854 次 |
| 最近记录: |