Rap*_*oth 6 scala apache-spark
我想写一个Spark 1.6 UDF,它采用以下映射:
case class MyRow(mapping: Map[(Int, Int), Double])
val data = Seq(
MyRow(Map((1, 1) -> 1.0))
)
val df = sc.parallelize(data).toDF()
df.printSchema()
root
|-- mapping: map (nullable = true)
| |-- key: struct
| |-- value: double (valueContainsNull = false)
| | |-- _1: integer (nullable = false)
| | |-- _2: integer (nullable = false)
Run Code Online (Sandbox Code Playgroud)
(作为旁注:我发现上面的输出很奇怪,因为键的类型打印在值的类型下面,为什么会这样?)
现在我将我的UDF定义为:
val myUDF = udf((inputMapping: Map[(Int,Int), Double]) =>
inputMapping.map { case ((i1, i2), value) => ((i1 + i2), value) }
)
df
.withColumn("udfResult", myUDF($"mapping"))
.show()
Run Code Online (Sandbox Code Playgroud)
但这给了我:
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2
Run Code Online (Sandbox Code Playgroud)
所以我尝试(Int,Int)
用自定义替换case class
,因为如果我想传递struct
给UDF,我通常会这样做:
case class MyTuple2(i1: Int, i2: Int)
val myUDF = udf((inputMapping: Map[MyTuple2, Double]) =>
inputMapping.map { case (MyTuple2(i1, i2), value) => ((i1 + i2), value) }
)
Run Code Online (Sandbox Code Playgroud)
这奇怪地给出了:
org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(mapping)' due to data type mismatch: argument 1 requires map<struct<i1:int,i2:int>,double> type, however, 'mapping' is of map<struct<_1:int,_2:int>,double> type.
Run Code Online (Sandbox Code Playgroud)
由于类型匹配,我不理解上述异常.
我发现的唯一(丑陋)解决方案是传递a org.apache.spark.sql.Row
然后"提取"结构的元素:
val myUDF = udf((inputMapping: Map[Row, Double]) => inputMapping
.map { case (key, value) => ((key.getInt(0), key.getInt(1)), value) } // extract Row into Tuple2
.map { case ((i1, i2), value) => ((i1 + i2), value) }
)
Run Code Online (Sandbox Code Playgroud)
据我所知,Row
在这种情况下没有逃避使用:映射(或另一个元组/案例类/数组...)中使用的元组(或案例类)是嵌套结构,因此它会表示为Row
传递给 UDF 的时间。
我可以建议的唯一改进是使用Row.unapply
稍微简化代码:
val myUDF = udf((inputMapping: Map[Row, Double]) => inputMapping
.map { case (Row(i1: Int, i2: Int), value) => (i1 + i2, value) }
)
Run Code Online (Sandbox Code Playgroud)