相关疑难解决方法(0)

Kafka KStreams - 处理超时

我试图<KStream>.process()用a TimeWindows.of("name", 30000)来批量处理一些KTable值并发送它们.似乎30秒超过了消费者超时间隔,之后Kafka认为该消费者已经解散并释放分区.

我已经尝试提高轮询频率和提交间隔以避免这种情况:

config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");
config.put(StreamsConfig.POLL_MS_CONFIG, "5000");
Run Code Online (Sandbox Code Playgroud)

不幸的是,这些错误仍在发生:

(很多这些)

ERROR  o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog 
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0
Run Code Online (Sandbox Code Playgroud)

其次是:

INFO   o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN   o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]: 
  org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

8
推荐指数
1
解决办法
6490
查看次数

标签 统计

apache-kafka ×1

apache-kafka-streams ×1

java ×1