Gal*_*Gal 9 scala apache-spark apache-spark-dataset
我有以下案例类:
case class User(userId: String)
Run Code Online (Sandbox Code Playgroud)
以及以下架构:
+--------------------+------------------+
| col_name| data_type|
+--------------------+------------------+
| user_id| string|
+--------------------+------------------+
Run Code Online (Sandbox Code Playgroud)
当我尝试将 a 转换DataFrame为 typed Dataset[User]with 时spark.read.table("MyTable").as[User],出现字段名称不匹配的错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException:
cannot resolve ''`user_id`' given input columns: [userId];;
Run Code Online (Sandbox Code Playgroud)
有没有什么简单的方法可以在不破坏 scala 习语和命名我的字段的情况下解决这个问题user_id?当然,我的真实表有更多的字段,而且我有更多的案例类/表,所以Encoder为每个案例类手动定义一个是不可行的(而且我不太了解宏,所以这是毫无疑问;尽管如果存在的话,我很乐意使用它!)。
我觉得我错过了一个非常明显的“将snake_case 转换为camelCase=true”选项,因为我使用过的几乎所有ORM 中都存在这个选项。
scala> val df = Seq(("Eric" ,"Theodore", "Cartman"), ("Butters", "Leopold", "Stotch")).toDF.select(concat($"_1", lit(" "), ($"_2")) as "first_and_middle_name", $"_3" as "last_name")
df: org.apache.spark.sql.DataFrame = [first_and_middle_name: string, last_name: string]
scala> df.show
+---------------------+---------+
|first_and_middle_name|last_name|
+---------------------+---------+
| Eric Theodore| Cartman|
| Butters Leopold| Stotch|
+---------------------+---------+
scala> val ccnames = df.columns.map(sc => {val ccn = sc.split("_")
| (ccn.head +: ccn.tail.map(_.capitalize)).mkString
| })
ccnames: Array[String] = Array(firstAndMiddleName, lastName)
scala> df.toDF(ccnames: _*).show
+------------------+--------+
|firstAndMiddleName|lastName|
+------------------+--------+
| Eric Theodore| Cartman|
| Butters Leopold| Stotch|
+------------------+--------+
Run Code Online (Sandbox Code Playgroud)
编辑:这有帮助吗?定义一个带有 loader: String => DataFrame 和 path: String 的函数。
scala> val parquetloader = spark.read.parquet _
parquetloader: String => org.apache.spark.sql.DataFrame = <function1>
scala> val tableloader = spark.read.table _
tableloader: String => org.apache.spark.sql.DataFrame = <function1>
scala> val textloader = spark.read.text _
textloader: String => org.apache.spark.sql.DataFrame = <function1>
// csv loader and others
def snakeCaseToCamelCaseDataFrameColumns(path: String, loader: String => DataFrame): DataFrame = {
val ccnames = loader(path).columns.map(sc => {val ccn = sc.split("_")
(ccn.head +: ccn.tail.map(_.capitalize)).mkString
})
df.toDF(ccnames: _*)
}
scala> :paste
// Entering paste mode (ctrl-D to finish)
def snakeCaseToCamelCaseDataFrameColumns(path: String, loader: String => DataFrame): DataFrame = {
val ccnames = loader(path).columns.map(sc => {val ccn = sc.split("_")
(ccn.head +: ccn.tail.map(_.capitalize)).mkString
})
df.toDF(ccnames: _*)
}
// Exiting paste mode, now interpreting.
snakeCaseToCamelCaseDataFrameColumns: (path: String, loader: String => org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
val oneDF = snakeCaseToCamelCaseDataFrameColumns(tableloader("/path/to/table"))
val twoDF = snakeCaseToCamelCaseDataFrameColumns(parquetloader("/path/to/parquet/file"))
Run Code Online (Sandbox Code Playgroud)