连接大型海量数据框架

Par*_*kar 6 python bigdata dataframe apache-spark pyspark

我的问题如下:

  • 我有一个大数据details框,称为包含900K行,另一个包含80M行,称为attributes

  • 两者都有一列A,我想在该列上进行左外连接,左数据框为deatils

  • A数据框中的列中只有75,000个唯一条目details。列中的数据框attributes80M唯一条目A

什么是实现join手术的最佳方法?

我尝试了什么?

  • 简单的连接即details.join(attributes, "A", how="left_outer")超时(或释放内存)。

  • 由于in列A中只有75,000个唯一条目details,因此我们不在乎in数据框中的其余条目attributes。因此,首先我使用以下方法进行过滤:

    uniqueA = details.select('A').distinct().collect()
    uniqueA = map(lambda x: x.A, uniqueA)
    attributes_filtered = attributes.filter(attributes.A.isin(*uniqueA))
    
    Run Code Online (Sandbox Code Playgroud)

    我认为这可以解决,因为该attributes表从8000万行减少到仅仅75000行。但是,仍然需要永远完成join(并且永远不会完成)。

  • 接下来,我认为分区太多,要连接的数据不在同一分区上。虽然,我不知道如何将所有数据带到同一分区,但我认为重新分区可能会有所帮助。所以就到这里。

    details_repartitioned = details.repartition("A")
    attributes_repartitioned = attributes.repartition("A")
    
    Run Code Online (Sandbox Code Playgroud)
  • 上述操作将分区的数量attributes从70K减少到200。分区的details数量约为1100。

    details_attributes = details_repartitioned.join(broadcast(
    attributes_repartitioned), "A", how='left_outer')  # tried without broadcast too
    
    Run Code Online (Sandbox Code Playgroud)

毕竟,join仍然无法使用。我仍在学习PySpark,因此我可能误解了重新分区背后的基础知识。如果有人可以阐明这一点,那就太好了。

PS我已经看过这个问题,但是不能回答这个问题。

Avi*_*rya 7

详细信息表有 900k 个项目,在 A 列中有 75k 个不同的条目。我认为您尝试过的 A 列上的过滤器是一个正确的方向。但是,collect 和后面的 map 操作

attributes_filtered = attributes.filter(attributes.A.isin(*uniqueA)) 
Run Code Online (Sandbox Code Playgroud)

这太贵了。另一种方法是

uniqueA = details.select('A').distinct().persist(StorageLevel.DISK_ONLY)
uniqueA.count // Breaking the DAG lineage
attrJoined = attributes.join(uniqueA, "inner")
Run Code Online (Sandbox Code Playgroud)

此外,如果您还没有这样做,您可能需要正确设置 shuffle 分区。

您的数据集中可能发生的一个问题是偏斜。它可能发生在 75k 个唯一值中,只有少数值与属性表中的大量行相连。在这种情况下,加入可能需要更长的时间并且可能无法完成。

要解决这个问题,您需要找到 A 列的偏斜值并单独处理它们。

  • 是的,打破 DAG 是最重要的。要打破 DAG,你需要在 Dataframe 上做一个 Action,比如计数、收集、保存等。因为除了动作之外,spark 中的所有操作都是惰性的。我发现计数比显式保存更容易。不确定是否有任何替代方法可以实现 (2认同)
  • 可能那是因为如果长 DAG 底部的阶段失败;spark 需要重新计算整个 DAG。这会产生后续故障并最终使作业失败。然而,如果我们打破 DAG ,任何阶段失败都不需要完全重新计算。这可能是一个原因,尽管我不完全确定。 (2认同)