Maj*_*imi 10 java apache-kafka
我们有一个具有以下配置的 KStreams 应用程序:
props.setProperty(RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
props.setProperty(RETRY_BACKOFF_MS_CONFIG, "5000"); // 5 seconds
props.setProperty(RECONNECT_BACKOFF_MS_CONFIG, "5000"); // 5 seconds
props.setProperty(REQUEST_TIMEOUT_MS_CONFIG, "5000"); // 5 seconds
props.setProperty(SESSION_TIMEOUT_MS_CONFIG, "25000"); // 25 seconds session timeout
props.setProperty(MAX_POLL_RECORDS_CONFIG, "100"); // 100 records per poll
props.setProperty(MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(Integer.MAX_VALUE));
// do not add any more time to window retention period, delete immidiately
props.setProperty(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, "0");
Run Code Online (Sandbox Code Playgroud)
即使非常大MAX_POLL_INTERVAL_MS_CONFIG,我们也会看到类似的错误(json 中的格式化异常):
{
"@timestamp": "2020-02-07T15:30:19.631Z",
"message": "[Consumer clientId=client-03a38ada-b39c-497a-acd4-aa95066fdc8a-StreamThread-6-consumer, groupId=group-name] Offset commit failed on partition group-name-repartition-3 at offset 9066: The coordinator is not aware of this member.",
"logger_name": "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
"level": "ERROR"
}
Run Code Online (Sandbox Code Playgroud)
我们还需要配置什么?是否还涉及其他参数?我不得不提的是,Kafka 代理是托管服务,我们不配置服务器端配置参数。另外,提交间隔设置为 10 秒。其他一切都是 KStreams 2.4.0 的默认值。
这个问题的另一个原因是没有发送心跳session.timeout.ms。所以也许你可以考虑增加这个。
heartbeat.interval.ms:使用 Kafka 的组管理工具时向消费者协调器发送心跳之间的预期时间。心跳用于确保消费者的会话保持活动状态,并在新消费者加入或离开组时促进重新平衡。该值必须设置为低于 session.timeout.ms,但通常不应高于该值的 1/3。它可以调整得更低,以控制正常重新平衡的预期时间。
session.timeout.ms:使用 Kafka 的组管理工具时用于检测客户端故障的超时时间。客户端定期发送心跳以向代理表明其活跃度。如果在此会话超时到期之前代理未收到任何心跳,则代理将从组中删除该客户端并启动重新平衡。