use*_*907 1 apache-spark apache-spark-sql
Spark:2.1.1
我将我保存dataframe为Oracle表,但生成的Oracle表具有" 区分大小写 "列.
val properties = new java.util.Properties
properties.setProperty("user", ora_username)
properties.setProperty("password", ora_pwd)
properties.setProperty("batchsize", "30000")
properties.setProperty("driver", db_driver)
Run Code Online (Sandbox Code Playgroud)
Run Code Online (Sandbox Code Playgroud)spark.sql("select * from myTable").repartition(50).write.mode(SaveMode.Overwrite).jdbc(url,"myTable_oracle", properties)
当我看到Oracle,
Select * from myTable_oracle; =>有效Select col1 from myTable_oracle;=> 不起作用Select "col1" from myTable_oracle; =>有效,但很烦人尝试下面的设置,但仍然是同样的问题:
spark.sqlContext.sql("set spark.sql.caseSensitive=false")
Run Code Online (Sandbox Code Playgroud)
用于工作的相同代码Spark 1.6.1创建Oracle具有不区分大小写列的表.但Spark 2.1.1我正面临着这个问题.
我找到了问题和解决方案: 启动Spark 2.x每个columnName在创建表时都会被双引号,因此当您尝试通过sqlPlus查询时,生成的Oracle表的columnNames会区分大小写.
dialect.quoteIdentifier
[ https://github.com/apache/spark/blob/branch-2.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala# L645]
而这个dialect.quoteIdentifier是双引号[ " ]
def quoteIdentifier(colName: String): String = {
s""""$colName""""
}
Run Code Online (Sandbox Code Playgroud)
解决方案:注销现有OracleDialect和重新注册,同时覆盖dialect.quoteIdentifier与甲骨文方言的工作所需的其他必要的东西一起
import java.sql.Types
import org.apache.spark.sql.types._
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.jdbc.{ JdbcDialects, JdbcType, JdbcDialect }
val url= "jdbc:oracle:thin:@HOST:1567/SID"
val dialect = JdbcDialects
JdbcDialects.unregisterDialect(dialect.get(url))
val OracleDialect = new JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") || url.contains("oracle")
override def getCatalystType(sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
// Handle NUMBER fields that have no precision/scale in special way because JDBC ResultSetMetaData converts this to 0 procision and -127 scale
if (sqlType == Types.NUMERIC && size == 0) {
// This is sub-optimal as we have to pick a precision/scale in advance whereas the data in Oracle is allowed
// to have different precision/scale for each value. This conversion works in our domain for now though we
// need a more durable solution. Look into changing JDBCRDD (line 406):
// FROM: mutableRow.update(i, Decimal(decimalVal, p, s))
// TO: mutableRow.update(i, Decimal(decimalVal))
Some(DecimalType(DecimalType.MAX_PRECISION, 10))
} // Handle Timestamp with timezone (for now we are just converting this to a string with default format)
//else if (sqlType == -101) {
// Some(StringType)
// }
else None
}
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("VARCHAR2(2000)", java.sql.Types.VARCHAR))
case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))
case IntegerType => Some(JdbcType("NUMBER(10)", java.sql.Types.NUMERIC))
case LongType => Some(JdbcType("NUMBER(19)", java.sql.Types.NUMERIC))
case DoubleType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))
case FloatType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))
case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))
case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))
case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB))
case TimestampType => Some(JdbcType("DATE", java.sql.Types.TIMESTAMP))
case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
//case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
//case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,4)", java.sql.Types.NUMERIC))
case _ => None
}
//Imp from Spark2.0 since otherwise oracle table columns would be case-sensitive
override def quoteIdentifier(colName: String): String = {
colName
}
}
JdbcDialects.registerDialect(OracleDialect)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1804 次 |
| 最近记录: |