Daw*_*wid 9 sql database sql-server scala apache-spark
我正在使用下面的代码向SQL Server的表中写入43列和大约2,000,000行的DataFrame:
dataFrame
.write
.format("jdbc")
.mode("overwrite")
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("url", url)
.option("dbtable", tablename)
.option("user", user)
.option("password", password)
.save()
Run Code Online (Sandbox Code Playgroud)
不幸的是,尽管它确实适用于小型DataFrame,但它要么非常慢,要么对于大型DataFrame超时。关于如何优化它的任何提示?
我尝试设置 rewriteBatchedStatements=true
谢谢。
我们求助于使用azure-sqldb-spark库,而不是使用Spark的默认内置导出功能。这个库给你一个bulkCopyToSqlDB这是一个方法真正的批量插入,去了很多更快。它比内置功能使用起来不实用,但以我的经验仍然值得。
我们或多或少地像这样使用它:
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
import com.microsoft.azure.sqldb.spark.query._
val options = Map(
"url" -> "***",
"databaseName" -> "***",
"user" -> "***",
"password" -> "***",
"driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver"
)
// first make sure the table exists, with the correct column types
// and is properly cleaned up if necessary
val query = dropAndCreateQuery(df, "myTable")
val createConfig = Config(options ++ Map("QueryCustom" -> query))
spark.sqlContext.sqlDBQuery(createConfig)
val bulkConfig = Config(options ++ Map(
"dbTable" -> "myTable",
"bulkCopyBatchSize" -> "20000",
"bulkCopyTableLock" -> "true",
"bulkCopyTimeout" -> "600"
))
df.bulkCopyToSqlDB(bulkConfig)
Run Code Online (Sandbox Code Playgroud)
如您所见,我们CREATE TABLE自己生成查询。您可以让该库创建表,但这样做dataFrame.limit(0).write.sqlDB(config)仍然会很低效,可能需要您缓存自己的DataFrame,并且不允许您选择SaveMode。
也可能很有趣:ExclusionRule在将此库添加到sbt构建中时,我们必须使用an ,否则assembly任务将失败。
libraryDependencies += "com.microsoft.azure" % "azure-sqldb-spark" % "1.0.2" excludeAll(
ExclusionRule(organization = "org.apache.spark")
)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1794 次 |
| 最近记录: |