Yur*_*man 9 scala apache-spark apache-spark-sql
我正在使用Spark Scala API.我有一个Spark SQL DataFrame(从Avro文件中读取),具有以下模式:
root
|-- ids: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: integer
| | |-- value: string (valueContainsNull = true)
|-- match: array (nullable = true)
| |-- element: integer (containsNull = true)
Run Code Online (Sandbox Code Playgroud)
基本上是2列[ids:List [Map [Int,String]],匹配:List [Int]].样本数据看起来像:
[List(Map(1 -> a), Map(2 -> b), Map(3 -> c), Map(4 -> d)),List(0, 0, 1, 0)]
[List(Map(5 -> c), Map(6 -> a), Map(7 -> e), Map(8 -> d)),List(1, 0, 1, 0)]
...
Run Code Online (Sandbox Code Playgroud)
我想做的是flatMap()每一行产生3列[ id,property,match ].使用上面的2行作为输入数据,我们将获得:
[1,a,0]
[2,b,0]
[3,c,1]
[4,d,0]
[5,c,1]
[6,a,0]
[7,e,1]
[8,d,0]
...
Run Code Online (Sandbox Code Playgroud)
然后产生groupBy的String 属性(例如:a,b,...)count("property")和sum("match"):
a 2 0
b 1 0
c 2 2
d 2 0
e 1 1
Run Code Online (Sandbox Code Playgroud)
我想做的事情如下:
val result = myDataFrame.select("ids","match").flatMap(
(row: Row) => row.getList[Map[Int,String]](1).toArray() )
result.groupBy("property").agg(Map(
"property" -> "count",
"match" -> "sum" ) )
Run Code Online (Sandbox Code Playgroud)
问题是flatMap将DataFrame转换为RDD.有一种很好的方法可以在使用DataFrames flatMap之后进行类型操作groupBy吗?
Dav*_*fin 11
这是什么flatMap做的,你想要什么?它将每个输入行转换为0行或更多行.它可以过滤掉它们,也可以添加新的.在SQL中获得您使用的相同功能join.你能做你想做的事join吗?
或者,您也可以查看Dataframe.explode,这只是一种特定的类型join(您可以explode通过将DataFrame连接到UDF 来轻松地创建自己的).explode将单个列作为输入,并允许您将其拆分或将其转换为多个值,然后join将原始行转换回新行.所以:
user groups
griffin mkt,it,admin
Run Code Online (Sandbox Code Playgroud)
可能成为:
user group
griffin mkt
griffin it
griffin admin
Run Code Online (Sandbox Code Playgroud)
所以我想看看,DataFrame.explode如果这不能让你轻松,请尝试加入UDF.
| 归档时间: |
|
| 查看次数: |
24641 次 |
| 最近记录: |