如何使用Scala DataFrameReader选项方法

jlp*_*jlp 7 scala apache-spark

Scala DataFrameReader有一个函数"option",它具有以下签名:

 def  option(key: String, value: String): DataFrameReader 
   // Adds an input option for the underlying data source. 
Run Code Online (Sandbox Code Playgroud)

那么什么是底层数据源的"输入选项",有人可以在这里分享一个如何使用这个函数的例子吗?

Dan*_*bos 6

可用选项的列表因文件格式而异。它们记录在DataFrameReaderAPI文档中

例如:

def json(paths: String*): DataFrame

加载JSON文件(每行一个对象),并将结果作为DataFrame返回。

此函数一次检查输入以确定输入模式。如果您事先知道该架构,请使用指定该架构的版本以避免额外的扫描。

您可以设置以下特定于JSON的选项来处理非标准JSON文件:

  • primitivesAsString(默认false):将所有原始值推断为字符串类型
  • prefersDecimal(默认false):将所有浮点值推断为十进制类型。如果这些值不适合小数,则将其推断为双精度。
  • allowComments(默认false):忽略JSON记录中的Java / C ++样式注释
  • allowUnquotedFieldNames(默认false):允许不带引号的JSON字段名称
  • allowSingleQuotes(默认true):除双引号外还允许单引号
  • allowNumericLeadingZeros(默认false):允许数字前导零(例如00012)
  • allowBackslashEscapingAnyCharacter(默认false):允许使用反斜杠引用机制接受所有字符的引用
  • mode(默认PERMISSIVE):允许一种在解析期间处理损坏记录的模式。
    • PERMISSIVEnull在遇到损坏的记录时将其他字段设置为,然后将格式错误的字符串放入由配置的新字段中columnNameOfCorruptRecord。由用户设置架构时,它将设置其他null字段。
    • DROPMALFORMED:忽略整个损坏的记录。
    • FAILFAST:遇到损坏的记录时将引发异常。
  • columnNameOfCorruptRecord(默认为中指定的值spark.sql.columnNameOfCorruptRecord):允许重命名由PERMISSIVEmode 创建的格式错误的字符串的新字段。这将覆盖spark.sql.columnNameOfCorruptRecord


Art*_*lev 3

火花源代码

  def option(key: String, value: String): DataFrameReader = {
    this.extraOptions += (key -> value)
    this
  }
Run Code Online (Sandbox Code Playgroud)

其中extraOptionsaMap和 的用法如下:

private def jdbc(
  url: String,
  table: String,
  parts: Array[Partition],
  connectionProperties: Properties): DataFrame = {
  val props = new Properties()
  // THIS
  extraOptions.foreach { case (key, value) =>
    props.put(key, value)
  }
  // connectionProperties should override settings in extraOptions
  props.putAll(connectionProperties)
  val relation = JDBCRelation(url, table, parts, props)(sqlContext)
  sqlContext.baseRelationToDataFrame(relation)
}
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,它只是一种将附加属性传递给jdbc驱动程序的方法。

还有更通用的options方法来传递Map而不是单个键值,它的用法示例在Spark 文档中:

val jdbcDF = sqlContext.read.format("jdbc").options(
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()
Run Code Online (Sandbox Code Playgroud)

  • 我不知道。我提出这个问题希望看到对选项列表的引用。如果有这样的清单吗? (4认同)