kru*_*har 10 dataframe apache-spark apache-spark-sql spark-dataframe
我正在尝试对集合进行一些分析.我有一个示例数据集,如下所示:
orders.json
{"items":[1,2,3,4,5]}
{"items":[1,2,5]}
{"items":[1,3,5]}
{"items":[3,4,5]}
Run Code Online (Sandbox Code Playgroud)
它只是一个字段,它是一个代表ID的数字列表.
这是我试图运行的Spark脚本:
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("Dataframe Test")
val sc = new SparkContext(sparkConf)
val sql = new SQLContext(sc)
val dataframe = sql.read.json("orders.json")
val expanded = dataframe
.explode[::[Long], Long]("items", "item1")(row => row)
.explode[::[Long], Long]("items", "item2")(row => row)
val grouped = expanded
.where(expanded("item1") !== expanded("item2"))
.groupBy("item1", "item2")
.count()
val recs = grouped
.groupBy("item1")
Run Code Online (Sandbox Code Playgroud)
创建expanded
并且grouped
很好,简而言之,expanded
是两个ID在同一原始集中的所有可能的两个ID的列表.grouped
过滤掉与自身匹配的ID,然后将所有唯一ID组合在一起并为每个ID生成计数.架构和数据样本grouped
是:
root
|-- item1: long (nullable = true)
|-- item2: long (nullable = true)
|-- count: long (nullable = false)
[1,2,2]
[1,3,2]
[1,4,1]
[1,5,3]
[2,1,2]
[2,3,1]
[2,4,1]
[2,5,2]
...
Run Code Online (Sandbox Code Playgroud)
所以,我的问题是:我现在如何对每个结果中的第一项进行分组,以便我有一个元组列表?对于上面的示例数据,我期望类似于此:
[1, [(2, 2), (3, 2), (4, 1), (5, 3)]]
[2, [(1, 2), (3, 1), (4, 1), (5, 2)]]
Run Code Online (Sandbox Code Playgroud)
正如您在我的脚本中所看到的recs
,我认为您将首先在'item1'上执行groupBy,这是每行中的第一项.但是之后你会留下这个对它有非常有限的动作的GroupedData对象.真的,你只剩下像sum,avg等聚合.我只想列出每个结果中的元组.
我此时可以轻松使用RDD功能,但这与使用Dataframes不同.有没有办法使用dataframe函数执行此操作.
Wil*_*ton 12
您可以使用org.apache.spark.sql.functions
(collect_list
和struct
)自1.6开始构建它
val recs =grouped.groupBy('item1).agg(collect_list(struct('item2,'count)).as("set"))
+-----+----------------------------+
|item1|set |
+-----+----------------------------+
|1 |[[5,3], [4,1], [3,2], [2,2]]|
|2 |[[4,1], [1,2], [5,2], [3,1]]|
+-----+----------------------------+
Run Code Online (Sandbox Code Playgroud)
您可以使用collect_set
也
编辑:有关信息,tuples
请勿在数据框中存在.最接近的结构是struct
因为它们相当于无类型数据集API中的大小写类.
编辑2:还要collect_set
注意警告,结果实际上不是一个集合(在SQL类型中没有具有set属性的数据类型).这意味着你可以得到不同的"集合",它们的顺序不同(至少在2.1.0版本中).sort_array
然后有必要对它们进行排序.
归档时间: |
|
查看次数: |
9065 次 |
最近记录: |