Spark createTableColumnTypes 不会导致用户提供的架构

hor*_*01d 6 jdbc apache-spark

不知道为什么这不起作用,但我只是尝试应用下面的内容,但仍然获得表 (mysql) 的 spark 模式版本,其中包含text我试图指定的 varchar(128) 而不是 varchar(128)。尝试使用 jdbc write 为我的列创建自定义数据类型。尝试使用 spark 2.1.0:

  def df2DB(
    df: DataFrame,
    batchSize: Int,
    numPartitions: Int,
    database: String,
    table: String): Unit = {

    val mdb = new MetadataBuilder()
    mdb.putString("col1", "INT")
    mdb.putString("col2", "VARCHAR(128)")
    mdb.putString("col3", "VARCHAR(128)")
    val createTableColTypes = mdb.build().json

    df.write.format("jdbc")
      .option("createTableColumnTypes", createTableColTypes)
      .option("url", url)
      .option("dbtable", s"${database}.${table}")
      .option("user", user)
      .option("password", pass)
      .option("driver", driver)
      .option("batchsize", batchSize)
      .option("numPartitions", numPartitions)
      .save()
  }
Run Code Online (Sandbox Code Playgroud)

我也尝试过这种格式但没有成功:

df.write.format("jdbc")
  .mode(SaveMode.Overwrite)
  .option("url", url)
  .option("dbtable", s"${database}.${table}")
  .option("user", user)
  .option("password", pass)
  .option("driver", driver)
  .option("batchsize", batchSize)
  .option("numPartitions", numPartitions)
  .option("createTableColumnTypes", "COL1 INT, COL2 VARCHAR(128)" )
  .save()
Run Code Online (Sandbox Code Playgroud)

此外,即使我尝试这样使用,我也会createTableOptions收到 sql 语法错误。我没有找到任何一起或单独使用这些选项的好例子:

.option("createTableOptions", "CREATE TABLE tbl1 (col1 int, col2 VARCHAR(128))").save()
Run Code Online (Sandbox Code Playgroud)

vat*_*ada 1

不要将列名创建为数据类型的 JSON,而是尝试使用逗号分隔的列名列表和数据类型,如下所示:

def df2DB(
    df: DataFrame,
    batchSize: Int,
    numPartitions: Int,
    database: String,
    table: String): Unit = {

    df.write.format("jdbc")
      .option("createTableColumnTypes", "col1 INT, col2 VARCHAR(128), col3 VARCHAR(128)")
      .option("url", url)
      .option("dbtable", s"${database}.${table}")
      .option("user", user)
      .option("password", pass)
      .option("driver", driver)
      .option("batchsize", batchSize)
      .option("numPartitions", numPartitions)
      .save()
  }
Run Code Online (Sandbox Code Playgroud)

参考: https: //github.com/apache/spark/blob/aa4cf2b19e4cf5588af7e2192e0e9f687cd84bc5/examples/src/main/python/sql/datasource.py#L210