Par*_*kar 6 python bigdata dataframe apache-spark pyspark
我的问题如下:
我有一个大数据details
框,称为包含900K行,另一个包含80M行,称为attributes
。
两者都有一列A
,我想在该列上进行左外连接,左数据框为deatils
。
A
数据框中的列中只有75,000个唯一条目details
。列中的数据框attributes
80M唯一条目A
。
什么是实现join
手术的最佳方法?
我尝试了什么?
简单的连接即details.join(attributes, "A", how="left_outer")
超时(或释放内存)。
由于in列A
中只有75,000个唯一条目details
,因此我们不在乎in数据框中的其余条目attributes
。因此,首先我使用以下方法进行过滤:
uniqueA = details.select('A').distinct().collect()
uniqueA = map(lambda x: x.A, uniqueA)
attributes_filtered = attributes.filter(attributes.A.isin(*uniqueA))
Run Code Online (Sandbox Code Playgroud)
我认为这可以解决,因为该attributes
表从8000万行减少到仅仅75000行。但是,仍然需要永远完成join
(并且永远不会完成)。
接下来,我认为分区太多,要连接的数据不在同一分区上。虽然,我不知道如何将所有数据带到同一分区,但我认为重新分区可能会有所帮助。所以就到这里。
details_repartitioned = details.repartition("A")
attributes_repartitioned = attributes.repartition("A")
Run Code Online (Sandbox Code Playgroud)上述操作将分区的数量attributes
从70K减少到200。分区的details
数量约为1100。
details_attributes = details_repartitioned.join(broadcast(
attributes_repartitioned), "A", how='left_outer') # tried without broadcast too
Run Code Online (Sandbox Code Playgroud)毕竟,join
仍然无法使用。我仍在学习PySpark,因此我可能误解了重新分区背后的基础知识。如果有人可以阐明这一点,那就太好了。
PS我已经看过这个问题,但是不能回答这个问题。
详细信息表有 900k 个项目,在 A 列中有 75k 个不同的条目。我认为您尝试过的 A 列上的过滤器是一个正确的方向。但是,collect 和后面的 map 操作
attributes_filtered = attributes.filter(attributes.A.isin(*uniqueA))
Run Code Online (Sandbox Code Playgroud)
这太贵了。另一种方法是
uniqueA = details.select('A').distinct().persist(StorageLevel.DISK_ONLY)
uniqueA.count // Breaking the DAG lineage
attrJoined = attributes.join(uniqueA, "inner")
Run Code Online (Sandbox Code Playgroud)
此外,如果您还没有这样做,您可能需要正确设置 shuffle 分区。
您的数据集中可能发生的一个问题是偏斜。它可能发生在 75k 个唯一值中,只有少数值与属性表中的大量行相连。在这种情况下,加入可能需要更长的时间并且可能无法完成。
要解决这个问题,您需要找到 A 列的偏斜值并单独处理它们。