节省大量时间

Sor*_*mar 2 apache-spark apache-spark-sql apache-spark-2.0

我有2个数据框,我想查找除2(surrogate_key,当前)以外的所有列均相等的记录

然后,我想用新的surrogate_key值保存这些记录。

以下是我的代码:

val seq = csvDataFrame.columns.toSeq
var exceptDF = csvDataFrame.except(csvDataFrame.as('a).join(table.as('b),seq).drop("surrogate_key","current"))
exceptDF.show()

exceptDF = exceptDF.withColumn("surrogate_key", makeSurrogate(csvDataFrame("name"), lit("ecc")))
exceptDF = exceptDF.withColumn("current", lit("Y"))

exceptDF.show()

exceptDF.write.option("driver","org.postgresql.Driver").mode(SaveMode.Append).jdbc(postgreSQLProp.getProperty("url"), tableName, postgreSQLProp)
Run Code Online (Sandbox Code Playgroud)

这段代码给出了正确的结果,但是在将这些结果写入postgre时却卡住了。

不确定是什么问题。还有没有更好的方法呢?

问候,Sorabh

nee*_*ani 5

默认情况下,spark-sql创建200个分区,这意味着当您尝试保存datafrmae时,它将被保存在200个实木复合地板文件中。您可以使用以下技术减少Dataframe的分区数量。

  1. 在应用程序级别。如下设置参数“ spark.sql.shuffle.partitions”:

sqlContext.setConf("spark.sql.shuffle.partitions", "10")

  1. 减少特定DataFrame的分区数量,如下所示:

df.coalesce(10).write.save(...)