Log*_*ter 1 indexing scala dataframe apache-spark udf
我有一个火花数据框,其中一列由列表的索引组成.我想写一个udf,它允许我创建一个新的列,其中包含与索引相关的值.
例如
假设我有以下数据帧和数组:
val df = spark.createDataFrame(Seq((0, Array(1, 1, 2)), (1, Array(1, 2, 0))))
df.show()
+---+---------+
| _1| _2|
+---+---------+
| 0|[1, 1, 2]|
| 1|[1, 2, 0]|
+---+---------+
val sArray = Array("a", "b", "c")
Run Code Online (Sandbox Code Playgroud)
我希望能够将指标映射_2
到它们的值,sArray
从而导致:
+---+---------+---------+
| _1| _2| _3|
+---+---------+---------+
| 0|[1, 1, 2]|[b, b, c]|
| 1|[1, 2, 0]|[b, c, a]|
+---+---------+---------+
Run Code Online (Sandbox Code Playgroud)
我一直试图用udf做到这一点:
def indexer (values: Array[String]) =
udf((indices: Array[Int]) => indices.map(values(_)))
df.withColumn("_3", indexer(sArray)($"_2"))
Run Code Online (Sandbox Code Playgroud)
但是当我这样做时,我收到以下错误:
无法执行用户定义的功能
...引起:java.lang.ClassCastException:scala.collection.mutable.WrappedArray $ ofRef无法强制转换为[I
这里出了什么问题?我怎样才能解决这个问题?
ArrayType
在DataFrame中的列上操作时,传递给UDF的实际类型是mutable.WrappedArray
.您看到的失败是尝试将其WrappedArray
转换为Array[Int]
您的函数所期望的结果.
修复相当简单 - 定义期望的函数mutable.WrappedArray[Int]
:
def indexer (values: Array[String]): UserDefinedFunction = {
udf((indices: mutable.WrappedArray[Int]) => indices.map(values(_)))
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
369 次 |
最近记录: |