Vit*_*ion 4 binary scala user-defined-functions apache-spark apache-spark-sql
原则上,我有一个由“名称”和“值”DataFrame字段组成的。第一个字段是 a ,第二个字段是 an 。StringArray[Byte]
我想要对每条记录执行的操作DataFrame是应用任何函数,使用 aUDF并创建一个新列。当“Values”是一个时,这非常有效Array[Int]。但是,当它是 时Array[Byte],会出现以下错误:
org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(Values)' due to data type mismatch: argument 1 requires array<tinyint> type, however, '`Values`' is of binary type.;;
'Project [Name#47, Values#48, UDF(Values#48) AS TwoTimes#56]
+- Project [_1#44 AS Name#47, _2#45 AS Values#48]
+- SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true) AS _1#44, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#45]
+- ExternalRDD [obj#43]
Run Code Online (Sandbox Code Playgroud)
完整代码如下:
scala> val df1 = spark.sparkContext.parallelize(Seq(("one", Array[Byte](1, 2, 3, 4, 5)), ("two", Array[Byte](6, 7, 8, 9, 10)))).toDF("Name", "Values")
df1: org.apache.spark.sql.DataFrame = [Name: string, Values: binary]
scala> df1.show
+----+----------------+
|Name| Values|
+----+----------------+
| one|[01 02 03 04 05]|
| two|[06 07 08 09 0A]|
+----+----------------+
scala> val twice = udf { (values: Seq[Byte]) =>
| val result = Array.ofDim[Byte](values.length)
| for (i <- values.indices)
| result(i) = (2 * values(i).toInt).toByte
| result
| }
twice: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,BinaryType,Some(List(ArrayType(ByteType,false))))
scala> val df2 = df1.withColumn("TwoTimes", twice('Values))
Run Code Online (Sandbox Code Playgroud)
我知道这样的错误是由于错误的数据类型(预期:Array[Byte],但它找到了 a Binary)而出现的,但我不明白的是为什么 Spark 将我推断Array[Byte]为 a Binary。有人可以向我解释一下吗?
如果我必须使用Binary而不是Array[Byte],我应该如何在我的内部处理它UDF?
我澄清一下,我的原作UDF没有使用简单的for循环。据我所知,在这个例子中,这可以用map方法来代替。
在 Spark 中,Array[Byte]表示为BinaryType. 从文档中我们可以看到:
public class BinaryType extends DataType
表示值的数据类型Array[Byte]。请使用单例 DataTypes.BinaryType。
因此,Array[Byte]和Binary是相同的,但是,它们之间存在一些差异,Seq[Byte]这会导致错误。
要解决此问题,只需在 udf 中替换Seq[Byte]为:Array[Byte]
val twice = udf { (values: Array[Byte]) =>
val result = Array.ofDim[Byte](values.length)
for (i <- values.indices)
result(i) = (2 * values(i).toInt).toByte
result
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7466 次 |
| 最近记录: |