从 Scala/Spark 写入 SQL Server 日期时间数据类型

jym*_*mbo 6 sql-server scala apache-spark databricks

我正在尝试使用与此类似的方法从数据块笔记本中批量插入 SQL Server 表:

批量复制到 Azure SQL 数据库或 SQL Server

这工作正常,直到我尝试写入数据类型为日期时间的列。我试图写入的表具有以下架构:

create table raw.HubDrg_TEST
(
  DrgKey varchar(64) not null,
  LoadDate datetime,
  LoadProcess varchar(255),
  RecordSource varchar(255),
  DrgCode varchar(255)
 )
Run Code Online (Sandbox Code Playgroud)

我的Scala代码如下:

//Get dataset for data in staging table
var stagedData: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", sqlDwUrlSmall)
  .option("tempDir", tempDir)
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("query", "select distinct CodeID as DrgCode, getdate() as LoadDate from StageMeditech.livendb_dbo_DAbsDrgs").load() 

//Get dataset for data in existing Hub
val existingHub: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", sqlDwUrlSmall)
  .option("tempDir", tempDir)
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("query", "Select DrgKey as ExistingDrgKey from raw.HubDrg_TEST")
  .load()

val sha_256 = udf((s: String) => { String.format("%032x", new BigInteger(1, MessageDigest.getInstance("SHA-256").digest(s.getBytes("UTF-8")))) })

//Add additional columns
stagedData = stagedData.withColumn("DrgKey",sha_256(col("DrgCode"))).withColumn("LoadProcess",lit("TestLoadProcess"))
                                   .withColumn("RecordSource",lit("TestRecordSource"))
//Join and filter out existing hub records
val dff = stagedData.join(existingHub, col("DrgKey")===col("ExistingDrgKey"), "left_outer").filter(existingHub.col("ExistingDrgKey").isNull).drop("ExistingDrgKey") 

//Bulk insert
val bulkCopyConfig = Config(Map( 
"url" -> dwServer, 
"databaseName" -> dwDatabase, 
"user" -> dwUser, 
"password" -> dwPass, 
"dbTable" -> "raw.HubDrg_TEST", 
"bulkCopyBatchSize" -> "2000", 
"bulkCopyTableLock" -> "false", 
"bulkCopyTimeout" -> "0" 
)) 

dff.bulkCopyToSqlDB(bulkCopyConfig) 
Run Code Online (Sandbox Code Playgroud)

我看到的问题是我选择的日期时间值getdate() as LoadDate在尝试插入上述表格时给了我这个错误:SqlNativeBufferBufferBulkCopy.WriteTdsDataToServer, error in OdbcDone: SqlState: 42000, NativeError: 4816, 'Error calling: bcp_done(this->GetHdbc()) | SQL Error Info: SrvrMsgState: 1, SrvrSeverity: 16, Error <1>: ErrorMsg: [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Invalid column type from bcp client for colid 2. | Error calling: pConn->Done() | state: FFFF, number: 58673, active connections: 9', Connection String: Driver={pdwodbc17e};app=TypeC01-DmsNativeWriter:DB66\mpdwsvc (13056)-ODBC;trusted_connection=yes;autotranslate=no;server=\\.\pipe\DB.66-a313018f1e5b\sql\query;database=Distribution_15

即使尝试不使用 SQL Server 查询中的日期时间值并将 LoadDate 值更改为:withColumn("LoadDate",current_timestamp()),尝试在 spark 中使用current_timestamp内置函数,它仍然不起作用。

我看到了这个stackoverflow文章,这是一个类似的问题,但它仍然没有回答这个问题。有没有人有一个很好的例子来说明如何使用库将日期时间数据类型插入到 SQL Server 表中com.microsoft.azure.sqldb.spark.bulkcopy._

这是一个来自做一个的数据样本 dff.show()

    +-------+--------------------+--------------------+---------------+----------------+
    |DrgCode|            LoadDate|              DrgKey|    LoadProcess|    RecordSource|
    +-------+--------------------+--------------------+---------------+----------------+
    |    390|2019-07-02 09:05:...|48a1a756f2d83f1dc...|TestLoadProcess|TestRecordSource|
    |     18|2019-07-02 09:05:...|4ec9599fc203d176a...|TestLoadProcess|TestRecordSource|
    |    481|2019-07-02 09:05:...|51d089cdaf0c968c9...|TestLoadProcess|TestRecordSource|
    |    460|2019-07-02 09:05:...|841a05fd378a2c067...|TestLoadProcess|TestRecordSource|
    |    838|2019-07-02 09:05:...|cef5838d118dccd9d...|TestLoadProcess|TestRecordSource|
    |     61|2019-07-02 09:05:...|d029fa3a95e174a19...|TestLoadProcess|TestRecordSource|
    |    807|2019-07-02 09:05:...|fce86e339dc3131c4...|TestLoadProcess|TestRecordSource|
    |     44|2019-07-02 09:05:...|71ee45a3c0db9a986...|TestLoadProcess|TestRecordSource|
    |    267|2019-07-02 09:05:...|8acc23987b8960d83...|TestLoadProcess|TestRecordSource|
    |    222|2019-07-02 09:05:...|9b871512327c09ce9...|TestLoadProcess|TestRecordSource|
    |    934|2019-07-02 09:05:...|a8443b1426652157e...|TestLoadProcess|TestRecordSource|
    |    677|2019-07-02 09:05:...|2782526eaa0c5c254...|TestLoadProcess|TestRecordSource|
    |    701|2019-07-02 09:05:...|290a0b92873bdf4e4...|TestLoadProcess|TestRecordSource|
    |    441|2019-07-02 09:05:...|2dfe70c43208f52b9...|TestLoadProcess|TestRecordSource|
    |    439|2019-07-02 09:05:...|50a010ce24d089605...|TestLoadProcess|TestRecordSource|
    |    883|2019-07-02 09:05:...|3055e0d8130c7a197...|TestLoadProcess|TestRecordSource|
    |    947|2019-07-02 09:05:...|4d0198f4905a08812...|TestLoadProcess|TestRecordSource|
    |    369|2019-07-02 09:05:...|5f193b350c8aba488...|TestLoadProcess|TestRecordSource|
    |     21|2019-07-02 09:05:...|6f4b6612125fb3a0d...|TestLoadProcess|TestRecordSource|
    |    503|2019-07-02 09:05:...|7182dd431b5c8833e...|TestLoadProcess|TestRecordSource|
    +-------+--------------------+--------------------+---------------+----------------+
    only showing top 20 rows

dff:org.apache.spark.sql.DataFrame
DrgCode:string
LoadDate:timestamp
DrgKey:string
LoadProcess:string
RecordSource:string
Run Code Online (Sandbox Code Playgroud)