Vas*_*kyi 6 java apache-kafka apache-kafka-streams
我们使用Kafka Streams来消费,处理和产生消息,而在PROD env上,我们面临着多个主题的错误:
ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=app-xxx-StreamThread-3-consumer, groupId=app]
Offset commit failed on partition xxx-1 at offset 13920:
The request timed out.[]
Run Code Online (Sandbox Code Playgroud)
对于负载较小的主题,这些错误很少发生,但是对于负载较高(和峰值)的主题,每个主题每天都会发生数十次错误。主题具有多个分区(例如10个)。似乎此问题不会影响数据处理(尽管有性能),因为在引发异常(即使对于相同的偏移量可能是多个错误)之后,使用者随后重新读取消息并成功处理它。
我看到此错误消息由于PR而出现在kafka-clients
版本中,但是在同一用例的早期版本中(在消费者上),类似的消息()被记录为级别。对于我来说,将日志级别更新为这种用例的警告会更合乎逻辑。1.0.0
kafka-clients
Errors.REQUEST_TIMED_OUT
Offset commit for group {} failed: {}
debug
如何解决这个问题?可能是根本原因?也许更改使用者属性或分区设置可以帮助摆脱此类问题。
我们使用以下实现来创建Kafka流:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.<String, String>stream(topicName);
stream.foreach((key, value) -> processMessage(key, value));
Topology topology = builder.build();
StreamsConfig streamsConfig = new StreamsConfig(consumerSettings);
new KafkaStreams(streamsTopology, streamsConfig);
Run Code Online (Sandbox Code Playgroud)
我们的Kafka消费者设置:
bootstrap.servers: xxx1:9092,xxx2:9092,...,xxx5:9092
application.id: app
state.dir: /tmp/kafka-streams/xxx
commit.interval.ms: 5000 # also I tried default value 30000
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
timestamp.extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor
Run Code Online (Sandbox Code Playgroud)
kafka经纪人版本:kafka_2.11-0.11.0.2
。两个版本的Kafka Streams都发生错误:1.0.1
和1.1.0
。
小智 3
看来您的 Kafka 集群存在问题,并且 Kafka 消费者在尝试提交偏移量时超时。您可以尝试增加Kafka消费者的连接相关配置
该配置控制客户端等待请求响应的最长时间
在该配置指定的毫秒数后关闭空闲连接。
归档时间: |
|
查看次数: |
6105 次 |
最近记录: |