Nic*_*ick 5 apache-spark apache-spark-sql apache-spark-ml
也许仅仅是因为我对API不太熟悉,但是我觉得Spark ML方法经常返回不必要地难以使用的DF。
这次,是ALS模型让我大跌眼镜。具体来说,为recommendedForAllUsers方法。让我们重构将返回的DF的类型:
scala> val arrayType = ArrayType(new StructType().add("itemId", IntegerType).add("rating", FloatType))
scala> val recs = Seq((1, Array((1, .7), (2, .5))), (2, Array((0, .9), (4, .1)))).
toDF("userId", "recommendations").
select($"userId", $"recommendations".cast(arrayType))
scala> recs.show()
Run Code Online (Sandbox Code Playgroud)
+------+------------------+
|userId| recommendations|
+------+------------------+
| 1|[[1,0.7], [2,0.5]]|
| 2|[[0,0.9], [4,0.1]]|
+------+------------------+
Run Code Online (Sandbox Code Playgroud)
+------+------------------+
|userId| recommendations|
+------+------------------+
| 1|[[1,0.7], [2,0.5]]|
| 2|[[0,0.9], [4,0.1]]|
+------+------------------+
Run Code Online (Sandbox Code Playgroud)
root
|-- userId: integer (nullable = false)
|-- recommendations: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- itemId: integer (nullable = true)
| | |-- rating: float (nullable = true)
Run Code Online (Sandbox Code Playgroud)
现在,我只关心itemId
在recommendations
列。毕竟,这种方法recommendForAllUsers
不是recommendAndScoreForAllUsers
(好吧,我将不再变得野蛮……)
我该怎么做呢??
我以为我在创建UDF时就拥有了它:
scala> recs.printSchema
Run Code Online (Sandbox Code Playgroud)
但这会产生错误:
scala> recs.withColumn("items", items($"recommendations"))
org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(recommendations)' due to data type mismatch: argument 1 requires array<struct<_1:int,_2:float>> type, however, '`recommendations`' is of array<struct<itemId:int,rating:float>> type.;;
'Project [userId#87, recommendations#92, UDF(recommendations#92) AS items#238]
+- Project [userId#87, cast(recommendations#88 as array<struct<itemId:int,rating:float>>) AS recommendations#92]
+- Project [_1#84 AS userId#87, _2#85 AS recommendations#88]
+- LocalRelation [_1#84, _2#85]
Run Code Online (Sandbox Code Playgroud)
有任何想法吗?谢谢!
哇,我的同事想出了一个非常优雅的解决方案:
scala> recs.select($"userId", $"recommendations.itemId").show
+------+------+
|userId|itemId|
+------+------+
| 1|[1, 2]|
| 2|[0, 4]|
+------+------+
Run Code Online (Sandbox Code Playgroud)
所以毕竟Spark ML API并不难:)
使用数组作为列的类型,例如recommendations
,使用爆炸函数(或更高级的 flatMap 运算符)会非常高效。
explode(e: Column): Column为给定数组或映射列中的每个元素创建一个新行。
这为您提供了可以使用的裸结构。
import org.apache.spark.sql.types._
val structType = new StructType().
add($"itemId".int).
add($"rating".float)
val arrayType = ArrayType(structType)
val recs = Seq((1, Array((1, .7), (2, .5))), (2, Array((0, .9), (4, .1)))).
toDF("userId", "recommendations").
select($"userId", $"recommendations" cast arrayType)
val exploded = recs.withColumn("recs", explode($"recommendations"))
scala> exploded.show
+------+------------------+-------+
|userId| recommendations| recs|
+------+------------------+-------+
| 1|[[1,0.7], [2,0.5]]|[1,0.7]|
| 1|[[1,0.7], [2,0.5]]|[2,0.5]|
| 2|[[0,0.9], [4,0.1]]|[0,0.9]|
| 2|[[0,0.9], [4,0.1]]|[4,0.1]|
+------+------------------+-------+
Run Code Online (Sandbox Code Playgroud)
结构体在select
带有*
(star) 的运算符中很好,可以将它们展平为每个结构体字段的列。
你可以做select($"element.*")
。
scala> exploded.select("userId", "recs.*").show
+------+------+------+
|userId|itemId|rating|
+------+------+------+
| 1| 1| 0.7|
| 1| 2| 0.5|
| 2| 0| 0.9|
| 2| 4| 0.1|
+------+------+------+
Run Code Online (Sandbox Code Playgroud)
我认为这可以做你想要的。
ps 尽可能远离 UDF,因为它们会“触发”从内部格式 ( InternalRow
) 到 JVM 对象的行转换,这可能导致过多的 GC。
归档时间: |
|
查看次数: |
4297 次 |
最近记录: |