PySpark:不使用循环将 DataFrame 拆分为多个 DataFrame

sji*_*han 3 python apache-spark pyspark spark-dataframe

嗨,我有一个如图所示的 DataFrame -

ID       X        Y

1      1234      284

1      1396      179

2      8620      178

3      1620      191

3      8820      828
Run Code Online (Sandbox Code Playgroud)

我想根据 ID 将此 DataFrame 拆分为多个 DataFrame。因此,对于此示例,将有 3 个 DataFrame。实现它的一种方法是在循环中运行过滤器操作。但是,我想知道是否可以以更有效的方式完成。

Jam*_*bin 6

#initialize spark dataframe
df = sc.parallelize([ (1,1234,282),(1,1396,179),(2,8620,178),(3,1620,191),(3,8820,828) ] ).toDF(["ID","X","Y"])

#get the list of unique ID values ; there's probably a better way to do this, but this was quick and easy
listids = [x.asDict().values()[0] for x in df.select("ID").distinct().collect()]
#create list of dataframes by IDs
dfArray = [df.where(df.ID == x) for x in listids]

dfArray[0].show()
+---+----+---+
| ID|   X|  Y|
+---+----+---+
|  1|1234|282|
|  1|1396|179|
+---+----+---+
dfArray[1].show()
+---+----+---+
| ID|   X|  Y|
+---+----+---+
|  2|8620|178|
+---+----+---+

dfArray[2].show()
+---+----+---+
| ID|   X|  Y|
+---+----+---+
|  3|1620|191|
|  3|8820|828|
+---+----+---+
Run Code Online (Sandbox Code Playgroud)

  • 你正在循环。我认为这最接近我所寻求的。http://stackoverflow.com/questions/41663985/spark-dataframe-how-to-efficiently-split-dataframe-for-each-group-based-on-same 但这是与之相关的 I/O 时间。 (2认同)
  • 是的,但是您可以将任务映射到不同的分区并获取 DF 列表。这就是我正在努力做的事情。 (2认同)

小智 6

如果您使用 Python 3.X,@James Tobin 的答案需要稍微改变一下,因为 dict.values 返回一个 dict-value 对象而不是列表。一个快速的解决方法就是添加列表函数:

listids = [list(x.asDict().values())[0] 
           for x in df.select("ID").distinct().collect()]
Run Code Online (Sandbox Code Playgroud)

作为单独的答案发布,因为我没有对他的答案发表评论所需的声誉。