Dataframe to Oracle使用区分大小写的列创建表

use*_*907 1 apache-spark apache-spark-sql

Spark:2.1.1

我将我保存dataframeOracle表,但生成的Oracle表具有" 区分大小写 "列.

val properties = new java.util.Properties
    properties.setProperty("user", ora_username)
    properties.setProperty("password", ora_pwd)
    properties.setProperty("batchsize", "30000")
    properties.setProperty("driver", db_driver)
Run Code Online (Sandbox Code Playgroud)
spark.sql("select * from myTable").repartition(50).write.mode(SaveMode.Overwrite).jdbc(url,"myTable_oracle", properties)
Run Code Online (Sandbox Code Playgroud)

当我看到Oracle,

  1. Select * from myTable_oracle; =>有效
  2. Select col1 from myTable_oracle;=> 不起作用
  3. Select "col1" from myTable_oracle; =>有效,但很烦人

尝试下面的设置,但仍然是同样的问题:

spark.sqlContext.sql("set spark.sql.caseSensitive=false")
Run Code Online (Sandbox Code Playgroud)

用于工作的相同代码Spark 1.6.1创建Oracle具有不区分大小写列的表.但Spark 2.1.1我正面临着这个问题.

use*_*907 6

我找到了问题和解决方案: 启动Spark 2.x每个columnName在创建表时都会被双引号,因此当您尝试通过sqlPlus查询时,生成的Oracle表的columnNames会区分大小写.

dialect.quoteIdentifier
[ https://github.com/apache/spark/blob/branch-2.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala# L645]

而这个dialect.quoteIdentifier是双引号[ " ]

  def quoteIdentifier(colName: String): String = {
    s""""$colName""""
  }
Run Code Online (Sandbox Code Playgroud)

[ https://github.com/apache/spark/blob/branch-2.1/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala#L90]

解决方案:注销现有OracleDialect和重新注册,同时覆盖dialect.quoteIdentifier与甲骨文方言的工作所需的其他必要的东西一起

import java.sql.Types
import org.apache.spark.sql.types._
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.jdbc.{ JdbcDialects, JdbcType, JdbcDialect }


val url= "jdbc:oracle:thin:@HOST:1567/SID"

val dialect = JdbcDialects
JdbcDialects.unregisterDialect(dialect.get(url))

val OracleDialect = new JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") || url.contains("oracle")

  override def getCatalystType(sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
    // Handle NUMBER fields that have no precision/scale in special way because JDBC ResultSetMetaData converts this to 0 procision and -127 scale
    if (sqlType == Types.NUMERIC && size == 0) {
      // This is sub-optimal as we have to pick a precision/scale in advance whereas the data in Oracle is allowed 
      //  to have different precision/scale for each value.  This conversion works in our domain for now though we 
      //  need a more durable solution.  Look into changing JDBCRDD (line 406):
      //    FROM:  mutableRow.update(i, Decimal(decimalVal, p, s))
      //    TO:  mutableRow.update(i, Decimal(decimalVal))
      Some(DecimalType(DecimalType.MAX_PRECISION, 10))
    } // Handle Timestamp with timezone (for now we are just converting this to a string with default format)
    //else if (sqlType == -101) {
    // Some(StringType)
    // } 
    else None
  }

  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case StringType            => Some(JdbcType("VARCHAR2(2000)", java.sql.Types.VARCHAR))
    case BooleanType           => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))
    case IntegerType           => Some(JdbcType("NUMBER(10)", java.sql.Types.NUMERIC))
    case LongType              => Some(JdbcType("NUMBER(19)", java.sql.Types.NUMERIC))
    case DoubleType            => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))
    case FloatType             => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))
    case ShortType             => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))
    case ByteType              => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))
    case BinaryType            => Some(JdbcType("BLOB", java.sql.Types.BLOB))
    case TimestampType         => Some(JdbcType("DATE", java.sql.Types.TIMESTAMP))
    case DateType              => Some(JdbcType("DATE", java.sql.Types.DATE))
    //case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
    //case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,4)", java.sql.Types.NUMERIC))
    case _                     => None
  }

  //Imp from Spark2.0 since otherwise oracle table columns would be case-sensitive
  override def quoteIdentifier(colName: String): String = {
    colName
  }

}

JdbcDialects.registerDialect(OracleDialect)
Run Code Online (Sandbox Code Playgroud)