Pau*_*aul 5 join dataframe apache-spark apache-spark-sql pyspark
我正在重写Spark应用程序以使用更多DataFrame操作来提高效率和稳健性.但是,有一部分应用程序无法使用DataFrames完成,我必须放弃到RDD.剥离其基本要素,代码将如下所示:
C = A.join(B, join_key) # join_key is a string naming a column
D = C.rdd.mapPartitions(do_something)
Run Code Online (Sandbox Code Playgroud)
为了正确操作,do_something需要C.rdd进行分区join_key.我认为情况就是这样,因为equijoins通过按键对数据进行分区,然后形成键值相同的对.在Spark RDD连接中,对由迭代器在分区数据上隐式形成,并且除非我告诉Spark将迭代器"物化"到一个列表中,否则它们将无法离开它们被定义的分区.配对然后重新分配结果,我不在这里做.我希望DataFrame连接也是如此.
总而言之,上面的讨论并未证明确保了所需的分区.我依赖于通过API无法保证的Spark实现的细节,我不确定它是100%安全的.不能保证Catalyst优化器不会将额外的分区边界抛到一组共享相同密钥的对中,将其分解并使我的算法不正确.
为了确保所需的分区,我可以C.rdd.partitionBy(lambda x: x['join_key'])在应用我的do_something函数之前明确地做,但我担心这可能会触发大量不必要的序列化,混乱或其他开销.
DISTRIBUTE BY根据这篇博客文章,我似乎也可以使用HiveQL,但同样,我不知道这可能引发什么开销.
我的问题是:依赖连接引起的隐式分区是安全的,还是我应该明确地确定它?如果是这样,确保它的最有效方法是什么?我正在使用PySpark 1.6.2.
一般来说,特定的连接机制不是合同的一部分,当您对分区的假设失败时,您可以相对容易地构建合成示例.例如,在某些条件下join可以表示为BroadcastHashJoin不会触发重新分区:
from pyspark.sql.functions import broadcast
# Just so we can easily inspect the results
sqlContext.setConf("spark.sql.shuffle.partitions", 4)
a = (sc
.parallelize([(1, "a"), (2, "b"), (3, "a"), (4, "b")], 2)
.toDF(["id", "join_key"]))
# Lets hint optimizer that b can be broadcasted
b = broadcast(
sc.parallelize([("a", "foo"), ("b", "bar")]).toDF(["join_key", "foobar"])
)
c = a.join(b, "join_key")
c.rdd.glom().collect()
## [[Row(join_key='a', id=1, foobar='foo'),
## Row(join_key='b', id=2, foobar='bar')],
## [Row(join_key='a', id=3, foobar='foo'),
## Row(join_key='b', id=4, foobar='bar')]]
Run Code Online (Sandbox Code Playgroud)
还有一些其他条件可以在没有显式提示的情况下使用广播连接(参见例如Databricks Guide - SQL,DataFrames和Datasets/BroadcastHashJoin),并且无法保证将来不会添加一些其他机制.
如果您想确定结果,请明确重新分区.
c.repartition("join_key").rdd.glom().collect()
## [[],
## [Row(join_key='b', id=2, foobar='bar'),
## Row(join_key='b', id=4, foobar='bar')],
## [Row(join_key='a', id=1, foobar='foo'),
## Row(join_key='a', id=3, foobar='foo')],
## []]
Run Code Online (Sandbox Code Playgroud)
这里还有一个问题就是使用DataFrames的效率和稳健性.如果你的逻辑严重依赖于直接在Python中访问数据(与SQL表达式相反),使用DataFrames和传递数据几乎就是一种反模式.您可以查看我对Spark函数与UDF性能的答案吗?这涵盖了类似的问题.因此,在采用此方法之前,请务必进行基准测试,因为在许多情况下,移动数据的成本很容易消耗SQL优化的所有好处.
| 归档时间: |
|
| 查看次数: |
6269 次 |
| 最近记录: |