如何在 Pyspark 中高效连接一个非常大的表和一个大表

Moh*_*han 5 apache-spark apache-spark-sql pyspark

我有两张桌子。这两个表都是 Hive 中以 parquet 数据格式存储的外部表。

第一个表table_1从 2015 年开始每天有2.5 亿行。该表根据 create_date 进行分区。因此,对于每个 create_date,大约有 250M 行。

第二个表 - table_2是每日增量表,平均行数约为150 万行。

两个表中有一个公共列“lookup_id”。现在,我需要使用数据帧从 table_1 中获取所有列,以获取 table_2 中的增量数据。

我想做如下的事情

table_1=spark.table("table_1")
table_2=spark.table("table_2")
result_df=table_1.join(table_2, table_1.lookup_id=table_2.lookup_id, "inner").drop(table_2.lookup_id)
Run Code Online (Sandbox Code Playgroud)

但我怀疑这是否真的有效,以及 pyspark 是否能够在没有任何内存错误的情况下处理这个问题。

问题1: 如何基于create_date分区并行化table_1扫描?

问题 2: 是否有其他方法可以基于 table_2 中的lookup_ids 和/或基于分区来优化 table_1 扫描?

其他信息可以让我更清楚地了解我正在寻找的内容:

我试图了解当我们使用数据帧连接表时,spark 是否读取数据并将其保存在内存中并连接它们,或者它只是在读取自身时连接。如果第二个为 true,则第二个语句适用于哪些连接。另外,如果需要使用循环来避免任何内存错误。

Rag*_*ghu 6

不确定您的驱动程序和执行程序内存,但通常有两种可能的连接优化 - 将小表广播到所有执行程序并为两个数据帧使用相同的分区键。在您的情况下,如果表 2 太大而无法广播,则根据您的查找 ID 重新分区将使速度更快。但赔偿有其自身的成本。您可以在这里找到更多信息 - https://umbertogriffo.gitbook.io/apache-spark-best-practices-and-tuning/avoiding_shuffle_less_stage-_more_fast#:~:text=One%20way%20to%20avoid%20shuffles,then%20broadcast %20to%20every%20执行者

让我知道你的想法。期待对此主题的讨论。

如果您无法广播,请使用存储桶避免连接的示例 - 灵感来自此处:Spark:在连接两个相同分区的数据帧时防止洗牌/交换

spark.catalog.setCurrentDatabase(<your databasename>)
test1.write.mode('overwrite').bucketBy(100,'item').saveAsTable('table_item')
test2.write.mode('overwrite').bucketBy(100,'item').saveAsTable('table_item1')
#test1.

#%%
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) # this is just to disable auto broadcasting for testing
import pyspark.sql.functions as F
inputDf1 = spark.sql("select * from table_item")
inputDf2 = spark.sql("select * from table_item1")
inputDf3 = inputDf1.alias("df1").join(inputDf2.alias("df2"),on='item')
Run Code Online (Sandbox Code Playgroud)

现在尝试

inputDf3.explain()
Run Code Online (Sandbox Code Playgroud)

结果将是这样的:

== Physical Plan ==
*(3) Project [item#1033, col1#1030, col2#1031, col3#1032, id#1038]
+- *(3) SortMergeJoin [item#1033], [item#1039], Inner
   :- *(1) Sort [item#1033 ASC NULLS FIRST], false, 0
   :  +- *(1) Project [col1#1030, col2#1031, col3#1032, item#1033]
   :     +- *(1) Filter isnotnull(item#1033)
   :        +- *(1) FileScan parquet 
   +- *(2) Sort [item#1039 ASC NULLS FIRST], false, 0
      +- *(2) Project [id#1038, item#1039]
         +- *(2) Filter isnotnull(item#1039)
            +- *(2) FileScan parquet 
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,这里没有发生 Exchange 哈希分区。因此,请尝试对两个数据框进行存储并尝试加入。