在我的 Spark 工作中,我尝试覆盖结构化流的每个微批次中的一个表
batchDF.write.mode(SaveMode.Overwrite).saveAsTable("mytable")
Run Code Online (Sandbox Code Playgroud)
它产生了以下错误。
Can not create the managed table('`mytable`'). The associated location('file:/home/ec2-user/environment/spark/spark-local/spark-warehouse/mytable') already exists.;
Run Code Online (Sandbox Code Playgroud)
我知道在 Spark 2.xx 中,解决这个问题的方法是添加以下选项。
spark.conf.set("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation","true")
Run Code Online (Sandbox Code Playgroud)
它在 Spark 2.xx 中运行良好。不过,这个选项在 Spark 3.0.0 中被删除了。那么,在Spark 3.0.0中我们应该如何解决这个问题呢?
谢谢!
基于 Spark 3.0 中的介绍,https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html。应该可以设置“kafka.group.id”来跟踪偏移量。对于我们的用例,如果流式 Spark 作业失败并重新启动,我想避免潜在的数据丢失。根据我之前的问题,我觉得 Spark 3.0 中的 kafka.group.id 会有所帮助。
如何为 Spark 结构化流指定 kafka 消费者的组 ID?
如何通过 Spark Structured Streaming 确保 kafka 数据摄取不会丢失数据?
但是,我在 spark 3.0 中尝试了如下设置。
package com.example
/**
* @author ${user.name}
*/
import scala.math.random
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, BooleanType, LongType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import org.apache.spark.SparkFiles
import java.util.Properties
import org.postgresql.Driver
import org.apache.spark.sql.streaming.Trigger
import java.time.Instant
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import …Run Code Online (Sandbox Code Playgroud) scala apache-kafka apache-spark spark-structured-streaming spark-kafka-integration