小编kop*_*aka的帖子

在 Golang 中写入文件

我对 Golang 比较陌生,还不确定如何使用某些语言结构。目前我有以下代码(带有测试调试输出),它没有提供预期的结果:

    json, _ := json.Marshal(struct)
    fmt.Println(json)

    f,_ := os.Create(fmt.Sprintf("/tmp/%s.json", "asd"))

    i,_ := f.Write(json)
    fmt.Println(i)  

    b, err := ioutil.ReadAll(f)
    fmt.Print(b)
Run Code Online (Sandbox Code Playgroud)

我期望以下行为:

  • 将结构转换为字节数组
  • 创建一个新文件
  • 将字节数组附加到文件

但是,当我在我的环境 (AWS Lambda) 中运行代码以及在 Golang Playground 中使用它时,该文件始终为空。上面代码的输出如下所示:

[123 34 ... <hug array of bytes>]
1384
[]
Run Code Online (Sandbox Code Playgroud)

这让我相信我f.Write()没有正确使用,尽管我遵循了包文档。所有其他输出都表示预期的行为,那么我的错误是什么?我对使用该File界面有些限制,否则我会使用ioutil.WriteFile(). 我的假设是在某些时候对指针/值的误解,但编译器阻止使用&f.

go

4
推荐指数
1
解决办法
270
查看次数

Flink 检查点不断失败(等待 InitProducerId 时超时)

在深入研究了许多 SO 帖子甚至 JIRA 问题后,我不知道该去哪里了。Flink 中的每个检查点都会因超时而失败,在作业的异常部分中,它显示以下错误,但作业本身不会失败:

org.apache.kafka.common.errors.TimeoutException: Timeout expired after 600000milliseconds while awaiting InitProducerId
Run Code Online (Sandbox Code Playgroud)

禁用检查点时,有关 Kafka 的所有内容都会按预期工作,因此我的假设是它可能与等待 Kafka 提交以便被确认的检查点有关(Semantic设置为EXACTLY_ONCE)。我记得读过有关超时不匹配导致问题的文章,因此我将TRANSACTION_TIMEOUT_CONFIGFlinkKafkaProducer 中的 调整为900000毫秒。

我还按照本期中的建议调整了 TransactionTimeout 和 MaxBlockMS,该期目前有很多关于这个完全相同的错误的讨论,但显然没有解决方案。

Flink 书《Stream Handling with Apache Flink》建议仔细修改 Kafka 配置,例如ackslog.flush.interval.messageslog.flush.interval.mslog.flush.*。我们已经在 Flink 1.9.1 下使用了这个功能,但自从我们升级到 1.11.1 后,它就不再工作了。我不知道是否有人同时接触过 Kafka 设置,但据我所知,除了log.flush.interval=10000. 我们像以前一样使用 Confluence 5.3.3,这意味着 Kafka 2.3.1

此外,Flink 作业部署在单节点环境中,因此它应该能够访问文件系统,整个目录由运行 Flink 服务的用户拥有(这是另一个 SO 线程中建议的解决方案)。

有人知道是什么原因导致这些检查点失败吗?

apache-kafka apache-flink

2
推荐指数
1
解决办法
4897
查看次数

标签 统计

apache-flink ×1

apache-kafka ×1

go ×1