Spark:优化将DataFrame写入SQL Server

Daw*_*wid 9 sql database sql-server scala apache-spark

我正在使用下面的代码向SQL Server的表中写入43列和大约2,000,000行的DataFrame:

dataFrame
  .write
  .format("jdbc")
  .mode("overwrite")
  .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
  .option("url", url)
  .option("dbtable", tablename)
  .option("user", user)
  .option("password", password)
  .save()
Run Code Online (Sandbox Code Playgroud)

不幸的是,尽管它确实适用于小型DataFrame,但它要么非常慢,要么对于大型DataFrame超时。关于如何优化它的任何提示?

我尝试设置 rewriteBatchedStatements=true

谢谢。

Shu*_*Shu 6

尝试batchsize至少在您的语句中添加选项> 10000(相应地更改此值以获得更好的性能),然后再次执行写入。

从Spark文档:

JDBC批处理大小,它确定每次往返要插入多少行。这可以帮助提高JDBC驱动程序的性能。此选项仅适用于写作。它默认为1000

同样值得一试:

  • numPartitions option 增加并行性(这也确定了并发JDBC连接的最大数量)

  • queryTimeout option 增加写选项的超时。


Jas*_*r-M 6

我们求助于使用azure-sqldb-spark库,而不是使用Spark的默认内置导出功能。这个库给你一个bulkCopyToSqlDB这是一个方法真正的批量插入,去了很多更快。它比内置功能使用起来不实用,但以我的经验仍然值得。

我们或多或少地像这样使用它:

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
import com.microsoft.azure.sqldb.spark.query._

val options = Map(
  "url"          -> "***",
  "databaseName" -> "***",
  "user"         -> "***",
  "password"     -> "***",
  "driver"       -> "com.microsoft.sqlserver.jdbc.SQLServerDriver"
)

// first make sure the table exists, with the correct column types
// and is properly cleaned up if necessary
val query = dropAndCreateQuery(df, "myTable")
val createConfig = Config(options ++ Map("QueryCustom" -> query))
spark.sqlContext.sqlDBQuery(createConfig)

val bulkConfig = Config(options ++ Map(
  "dbTable"           -> "myTable",
  "bulkCopyBatchSize" -> "20000",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.bulkCopyToSqlDB(bulkConfig)
Run Code Online (Sandbox Code Playgroud)

如您所见,我们CREATE TABLE自己生成查询。您可以让该库创建表,但这样做dataFrame.limit(0).write.sqlDB(config)仍然会很低效,可能需要您缓存自己的DataFrame,并且不允许您选择SaveMode

也可能很有趣:ExclusionRule在将此库添加到sbt构建中时,我们必须使用an ,否则assembly任务将失败。

libraryDependencies += "com.microsoft.azure" % "azure-sqldb-spark" % "1.0.2" excludeAll(
  ExclusionRule(organization = "org.apache.spark")
)
Run Code Online (Sandbox Code Playgroud)