kop*_*aka 2 apache-kafka apache-flink
在深入研究了许多 SO 帖子甚至 JIRA 问题后,我不知道该去哪里了。Flink 中的每个检查点都会因超时而失败,在作业的异常部分中,它显示以下错误,但作业本身不会失败:
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 600000milliseconds while awaiting InitProducerId
Run Code Online (Sandbox Code Playgroud)
禁用检查点时,有关 Kafka 的所有内容都会按预期工作,因此我的假设是它可能与等待 Kafka 提交以便被确认的检查点有关(Semantic
设置为EXACTLY_ONCE
)。我记得读过有关超时不匹配导致问题的文章,因此我将TRANSACTION_TIMEOUT_CONFIG
FlinkKafkaProducer 中的 调整为900000
毫秒。
我还按照本期中的建议调整了 TransactionTimeout 和 MaxBlockMS,该期目前有很多关于这个完全相同的错误的讨论,但显然没有解决方案。
Flink 书《Stream Handling with Apache Flink》建议仔细修改 Kafka 配置,例如acks
、log.flush.interval.messages
、log.flush.interval.ms
和log.flush.*
。我们已经在 Flink 1.9.1 下使用了这个功能,但自从我们升级到 1.11.1 后,它就不再工作了。我不知道是否有人同时接触过 Kafka 设置,但据我所知,除了log.flush.interval=10000
. 我们像以前一样使用 Confluence 5.3.3
,这意味着 Kafka 2.3.1
。
此外,Flink 作业部署在单节点环境中,因此它应该能够访问文件系统,整个目录由运行 Flink 服务的用户拥有(这是另一个 SO 线程中建议的解决方案)。
有人知道是什么原因导致这些检查点失败吗?
在对此感到非常头疼之后,我终于发现了问题:Kafka 设置实际上被transaction.state.log.replication.factor
更改了,因为低于transaction.state.log.min.isr
,导致没有事务真正成功,因为 Kafka 主题的同步副本永远不够。
归档时间: |
|
查看次数: |
4897 次 |
最近记录: |