Lev*_*Lev 5 user-defined-functions dataframe apache-spark apache-spark-sql
我正在尝试将UDF与输入类型Array of struct一起使用.我有以下数据结构,这只是更大结构的相关部分
|--investments: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- funding_round: struct (nullable = true)
| | | |-- company: struct (nullable = true)
| | | | |-- name: string (nullable = true)
| | | | |-- permalink: string (nullable = true)
| | | |-- funded_day: long (nullable = true)
| | | |-- funded_month: long (nullable = true)
| | | |-- funded_year: long (nullable = true)
| | | |-- raised_amount: long (nullable = true)
| | | |-- raised_currency_code: string (nullable = true)
| | | |-- round_code: string (nullable = true)
| | | |-- source_description: string (nullable = true)
| | | |-- source_url: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我宣布了案例类:
case class Company(name: String, permalink: String)
case class FundingRound(company: Company, funded_day: Long, funded_month: Long, funded_year: Long, raised_amount: Long, raised_currency_code: String, round_code: String, source_description: String, source_url: String)
case class Investments(funding_round: FundingRound)
Run Code Online (Sandbox Code Playgroud)
UDF声明:
sqlContext.udf.register("total_funding", (investments:Seq[Investments]) => {
val totals = investments.map(r => r.funding_round.raised_amount)
totals.sum
})
Run Code Online (Sandbox Code Playgroud)
当我执行以下转换时,结果如预期
scala> sqlContext.sql("""select total_funding(investments) from companies""")
res11: org.apache.spark.sql.DataFrame = [_c0: bigint]
Run Code Online (Sandbox Code Playgroud)
但是当像collect一样执行的操作时我有一个错误:
Executor: Exception in task 0.0 in stage 4.0 (TID 10)
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line33.$read$$iwC$$iwC$Investments
Run Code Online (Sandbox Code Playgroud)
感谢您的任何帮助.
zer*_*323 12
你看到的错误应该是不言自明的.Catalyst/SQL类型和Scala类型之间存在严格的映射,可以在Spark SQL,DataFrames和Datasets Guide 的相关部分中找到.
特定struct类型转换为o.a.s.sql.Row(在您的特定情况下,数据将被公开为Seq[Row]).
有不同的方法可用于将数据公开为特定类型:
DataFrame到Dataset[T]其中T是所需的局部类型.只有前一种方法才适用于这种特殊情况.
如果你想investments.funding_round.raised_amount使用UDF 访问,你需要这样的东西:
val getRaisedAmount = udf((investments: Seq[Row]) => scala.util.Try(
investments.map(_.getAs[Row]("funding_round").getAs[Long]("raised_amount"))
).toOption)
Run Code Online (Sandbox Code Playgroud)
但简单select应该更安全,更清洁:
df.select($"investments.funding_round.raised_amount")
Run Code Online (Sandbox Code Playgroud)