在 Spark 中加入多个表的有效方法 - 设备上没有剩余空间

cph*_*sto 5 hadoop-partitioning apache-spark pyspark

这里也有人问类似的问题,但它没有正确解决我的问题。我有近 100 个数据帧,每个数据帧至少有200,000行,我需要通过full基于列进行连接来加入它们ID,从而创建一个带有列的数据帧 - ID, Col1, Col2,Col3,Col4, Col5..., Col102

只是为了说明,我的 DataFrames 的结构 -

df1 =                          df2 =            df3 =          .....  df100 = 
+----+------+------+------+    +----+------+    +----+------+         +----+------+ 
|  ID|  Col1|  Col2|  Col3|    |  ID|  Col4|    |  ID|  Col5|         |  ID|Col102|
+----+------+-------------+    +----+------+    +----+------+         +----+------+
| 501|  25.1|  34.9| 436.9|    | 501| 22.33|    | 503| 22.33|         | 501|  78,1|
| 502|  12.2|3225.9|  46.2|    | 502| 645.1|    | 505| 645.1|         | 502|  54.9|
| 504| 754.5| 131.0| 667.3|    | 504| 547.2|    | 504| 547.2|         | 507|     0|
| 505|324.12| 48.93|  -1.3|    | 506|     2|    | 506|     2|         | 509| 71.57|
| 506| 27.51| 88.99|  67.7|    | 507| 463.7|    | 507| 463.7|         | 510|  82.1|
.
.
+----+------+------|------|    |----|------|    |----|------|         |----|------|
Run Code Online (Sandbox Code Playgroud)

我开始通过full对所有数据帧进行连接来开始加入这些数据帧。自然,这是计算密集型过程,必须努力减少shuffles跨不同工作节点的数量。因此,我开始通过分割数据帧df1基于ID使用重新分配() ,其中hash-partitions所述数据帧基于ID到30个分区-

df1 = df1.repartition(30,'ID')
Run Code Online (Sandbox Code Playgroud)

现在,我fulldf1和之间进行连接df2

df = df1.join(df2,['ID'],how='full')
df.persist()
Run Code Online (Sandbox Code Playgroud)

由于df1已经是hash-partitioned,所以我原以为join上面的这会跳过洗牌并保持partitionerof df1,但我注意到 ashuffle确实发生了,它将分区数量df增加到200. 现在,如果我通过如下所示的函数调用它们来继续加入后续的数据帧,我会收到错误java.io.IOException: No space left on device-

def rev(df,num):
     df_temp = spark.read.load(filename+str(num)+'.csv')
     df_temp.persist()
     df = df.join(df_temp,['ID'],how='full')
     df_temp.unpersist()
     return df

df = rev(df,3)
df = rev(df,4)
.
.
df = rev(df,100)
# I get the ERROR here below, when I call the first action count() - 
print("Total number of rows: "+str(df.count()))
df.unpersist()  # Never reached this stage.
Run Code Online (Sandbox Code Playgroud)

更新:错误信息 -

Py4JJavaError: An error occurred while calling o3487.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 in stage 255.0 failed 1 times, most recent failure: Lost task 42.0 in stage 255.0 (TID 8755, localhost, executor driver): java.io.IOException: No space left on device
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
Run Code Online (Sandbox Code Playgroud)

问题: 1. 为什么df1我们第一次做partitioner的时候没有维护join

2.如何有效地连接这些多个表并避免此No space left on device问题?用户@silvio在这里建议使用.bucketBy(),但他也暗示了分区程序将被维护的事实,这并没有发生。因此,我不确定连接这些多个 DataFrame 的有效方法是什么。

任何建议/提示将不胜感激。

oll*_*ik1 1

我过去也遇到过类似的问题,只是没有那么多 RDD。我能找到的最有效的解决方案是使用低级 RDD API。首先存储所有 RDD,以便它们在分区内按连接列进行(哈希)分区和排序: https: //spark.apache.org/docs/2.4.0/api/java/org/apache/spark /rdd/OrderedRDDFunctions.html#repartitionAndSortWithinPartitions-org.apache.spark.Partitioner-

此后,可以使用 zip 分区实现连接,而无需洗牌或使用大量内存:https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/rdd/RDD.html#zipPartitions -org.apache.spark.rdd.RDD-boolean-scala.Function2-scala.reflect.ClassTag-scala.reflect.ClassTag-


归档时间:

查看次数:

7071 次

最近记录:

6 年,5 月 前