Pau*_*aul 5 python dataframe apache-spark apache-spark-sql pyspark
我对使用Spark SQL(1.6)执行以下形式的“过滤的等联接”感兴趣
A inner join B where A.group_id = B.group_id and pair_filter_udf(A[cols], B[cols])
Run Code Online (Sandbox Code Playgroud)
这里的group_id
是粗略的:单个值group_id
可以与A和B中的10,000条记录相关联。
如果等值联接是由自身执行的,如果没有pair_filter_udf
,则的粗略性group_id
会产生计算问题。例如,对于group_id
在A和B中都有10,000条记录的a,联接中将有1亿个条目。如果我们有成千上万个如此大的组,我们将生成一个巨大的表,并且很容易耗尽内存。
因此,至关重要的是我们向下推pair_filter_udf
入连接并在生成对时对其进行过滤,而不是等到所有对都生成后再进行过滤。我的问题是Spark SQL是否这样做。
我建立了一个简单的过滤式等联接,并询问Spark的查询计划是什么:
# run in PySpark Shell
import pyspark.sql.functions as F
sq = sqlContext
n=100
g=10
a = sq.range(n)
a = a.withColumn('grp',F.floor(a['id']/g)*g)
a = a.withColumnRenamed('id','id_a')
b = sq.range(n)
b = b.withColumn('grp',F.floor(b['id']/g)*g)
b = b.withColumnRenamed('id','id_b')
c = a.join(b,(a.grp == b.grp) & (F.abs(a['id_a'] - b['id_b']) < 2)).drop(b['grp'])
c = c.sort('id_a')
c = c[['grp','id_a','id_b']]
c.explain()
Run Code Online (Sandbox Code Playgroud)
结果:
== Physical Plan ==
Sort [id_a#21L ASC], true, 0
+- ConvertToUnsafe
+- Exchange rangepartitioning(id_a#21L ASC,200), None
+- ConvertToSafe
+- Project [grp#20L,id_a#21L,id_b#24L]
+- Filter (abs((id_a#21L - id_b#24L)) < 2)
+- SortMergeJoin [grp#20L], [grp#23L]
:- Sort [grp#20L ASC], false, 0
: +- TungstenExchange hashpartitioning(grp#20L,200), None
: +- Project [id#19L AS id_a#21L,(FLOOR((cast(id#19L as double) / 10.0)) * 10) AS grp#20L]
: +- Scan ExistingRDD[id#19L]
+- Sort [grp#23L ASC], false, 0
+- TungstenExchange hashpartitioning(grp#23L,200), None
+- Project [id#22L AS id_b#24L,(FLOOR((cast(id#22L as double) / 10.0)) * 10) AS grp#23L]
+- Scan ExistingRDD[id#22L]
Run Code Online (Sandbox Code Playgroud)
这些是计划中的关键内容:
+- Filter (abs((id_a#21L - id_b#24L)) < 2)
+- SortMergeJoin [grp#20L], [grp#23L]
Run Code Online (Sandbox Code Playgroud)
这些行给人的印象是,过滤将在联接之后的单独阶段中完成,这不是期望的行为。但是也许它被隐式地压入了联接中,而查询计划只是缺乏这种细节水平。
在这种情况下,我如何知道Spark在做什么?
更新:
我正在使用n = 1e6和g = 1e5进行实验,如果Spark不执行下推操作,这应该足以使笔记本电脑崩溃。由于它没有崩溃,我想它正在下推。但是,了解它的工作原理以及Spark SQL源代码的哪些部分负责这一出色的优化将很有趣。
很大程度上取决于您所说的下推。如果您问是否|a.id_a - b.id_b| < 2
将执行作为join
逻辑的一部分,a.grp = b.grp
则答案是否定的。不基于相等性的谓词不直接包含在join
条件中。
一种可以说明这一点的方法是使用DAG而不是执行计划。它看起来或多或少是这样的:
如您所见,它filter
是作为与分开的转换执行的SortMergeJoin
。另一种方法是在您退出时分析执行计划a.grp = b.grp
。您会看到它扩展join
为笛卡尔乘积,后面是filter
,没有其他优化:
d = a.join(b,(F.abs(a['id_a'] - b['id_b']) < 2)).drop(b['grp'])
## == Physical Plan ==
## Project [id_a#2L,grp#1L,id_b#5L]
## +- Filter (abs((id_a#2L - id_b#5L)) < 2)
## +- CartesianProduct
## :- ConvertToSafe
## : +- Project [id#0L AS id_a#2L,(FLOOR((cast(id#0L as double) / 10.0)) * 10) AS grp#1L]
## : +- Scan ExistingRDD[id#0L]
## +- ConvertToSafe
## +- Project [id#3L AS id_b#5L]
## +- Scan ExistingRDD[id#3L]
Run Code Online (Sandbox Code Playgroud)
这是否意味着您的代码(不是带有笛卡尔的代码-您实际上要避免这种情况)生成了一个巨大的中间表?
不,不是。两者SortMergeJoin
和filter
都作为一个阶段执行(请参阅DAG)。尽管DataFrame
可以在较低的层次上应用一些操作细节,但是基本上这只是Scala上的一系列转换,Iterators
而且正如Justin Pihony以非常说明性的方式显示的那样,可以将不同的操作压缩在一起,而无需添加任何Spark-具体逻辑。在单个任务中将应用这两种过滤器的一种或另一种方式。
归档时间: |
|
查看次数: |
855 次 |
最近记录: |