小编Ole*_*nko的帖子

如何在流式查询中使用 Google Cloud Storage 作为检查点位置?

我正在尝试运行 Spark Structured Streaming 作业并将检查点保存到 Google Storage,我有几个作业,一个没有聚合的工作完美,但第二个是聚合抛出异常。我发现有人在 S3 上有类似的检查点问题,因为 S3 不支持先读后写语义https://blog.yuvalitzchakov.com/improving-spark-streaming-checkpoint-performance-with-aws-efs/,但 GS确实,一切都应该没问题,如果有人能分享他们在检查点方面的经验,我会很高兴。

val writeToKafka = stream.writeStream
  .format("kafka")
  .trigger(ProcessingTime(5000))
  .option("kafka.bootstrap.servers", "localhost:29092")
  .option("topic", "test_topic")
  .option("checkpointLocation", "gs://test/check_test/Job1")
  .start()
Run Code Online (Sandbox Code Playgroud)
    Executor task launch worker for task 1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
[Executor task launch worker for task 1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
[Executor task launch worker for task 1] INFO org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask - Committed partition 0 (task 1, attempt 0stage 1.0)
[Executor task launch worker for task 1] …
Run Code Online (Sandbox Code Playgroud)

google-cloud-storage google-cloud-platform google-cloud-dataproc spark-structured-streaming

6
推荐指数
1
解决办法
700
查看次数