小编yyu*_*nkm的帖子

Spark 3.0出现以下问题如何解决?无法创建托管表。关联位置已存在。;

在我的 Spark 工作中,我尝试覆盖结构化流的每个微批次中的一个表

batchDF.write.mode(SaveMode.Overwrite).saveAsTable("mytable")
Run Code Online (Sandbox Code Playgroud)

它产生了以下错误。

  Can not create the managed table('`mytable`'). The associated location('file:/home/ec2-user/environment/spark/spark-local/spark-warehouse/mytable') already exists.;

Run Code Online (Sandbox Code Playgroud)

我知道在 Spark 2.xx 中,解决这个问题的方法是添加以下选项。

spark.conf.set("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation","true")
Run Code Online (Sandbox Code Playgroud)

它在 Spark 2.xx 中运行良好。不过,这个选项在 Spark 3.0.0 中被删除了。那么,在Spark 3.0.0中我们应该如何解决这个问题呢?

谢谢!

apache-spark spark-streaming spark3

6
推荐指数
1
解决办法
7850
查看次数

如何在 spark 3.0 结构化流中使用 kafka.group.id 和检查点以继续从重启后停止的 Kafka 读取?

基于 Spark 3.0 中的介绍,https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html。应该可以设置“kafka.group.id”来跟踪偏移量。对于我们的用例,如果流式 Spark 作业失败并重新启动,我想避免潜在的数据丢失。根据我之前的问题,我觉得 Spark 3.0 中的 kafka.group.id 会有所帮助。

如何为 Spark 结构化流指定 kafka 消费者的组 ID?

如何通过 Spark Structured Streaming 确保 kafka 数据摄取不会丢失数据?

但是,我在 spark 3.0 中尝试了如下设置。

package com.example

/**
 * @author ${user.name}
 */
import scala.math.random

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, BooleanType, LongType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import org.apache.spark.SparkFiles
import java.util.Properties
import org.postgresql.Driver
import org.apache.spark.sql.streaming.Trigger
import java.time.Instant
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import …
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka apache-spark spark-structured-streaming spark-kafka-integration

3
推荐指数
1
解决办法
1159
查看次数