如何确保Spark DataFrame连接引起的分区?

Pau*_*aul 5 join dataframe apache-spark apache-spark-sql pyspark

我正在重写Spark应用程序以使用更多DataFrame操作来提高效率和稳健性.但是,有一部分应用程序无法使用DataFrames完成,我必须放弃到RDD.剥离其基本要素,代码将如下所示:

C = A.join(B, join_key) # join_key is a string naming a column
D = C.rdd.mapPartitions(do_something)
Run Code Online (Sandbox Code Playgroud)

为了正确操作,do_something需要C.rdd进行分区join_key.我认为情况就是这样,因为equijoins通过按键对数据进行分区,然后形成键值相同的对.在Spark RDD连接中,对由迭代器在分区数据上隐式形成,并且除非我告诉Spark将迭代器"物化"到一个列表中,否则它们将无法离开它们被定义的分区.配对然后重新分配结果,我不在这里做.我希望DataFrame连接也是如此.

总而言之,上面的讨论并未证明确保了所需的分区.我依赖于通过API无法保证的Spark实现的细节,我不确定它是100%安全的.不能保证Catalyst优化器不会将额外的分区边界抛到一组共享相同密钥的对中,将其分解并使我的算法不正确.

为了确保所需的分区,我可以C.rdd.partitionBy(lambda x: x['join_key'])在应用我的do_something函数之前明确地做,但我担心这可能会触发大量不必要的序列化,混乱或其他开销.

DISTRIBUTE BY根据这篇博客文章,我似乎也可以使用HiveQL,但同样,我不知道这可能引发什么开销.

我的问题是:依赖连接引起的隐式分区是安全的,还是我应该明确地确定它?如果是这样,确保它的最有效方法是什么?我正在使用PySpark 1.6.2.

zer*_*323 7

一般来说,特定的连接机制不是合同的一部分,当您对分区的假设失败时,您可以相对容易地构建合成示例.例如,在某些条件下join可以表示为BroadcastHashJoin不会触发重新分区:

from pyspark.sql.functions import broadcast

# Just so we can easily inspect the results
sqlContext.setConf("spark.sql.shuffle.partitions", 4)

a = (sc
    .parallelize([(1, "a"), (2, "b"), (3, "a"), (4, "b")],  2)
    .toDF(["id", "join_key"]))

# Lets hint optimizer that b can be broadcasted
b = broadcast(
    sc.parallelize([("a", "foo"), ("b", "bar")]).toDF(["join_key", "foobar"])
)

c = a.join(b, "join_key")
c.rdd.glom().collect()

## [[Row(join_key='a', id=1, foobar='foo'),
##  Row(join_key='b', id=2, foobar='bar')],
##  [Row(join_key='a', id=3, foobar='foo'),
##  Row(join_key='b', id=4, foobar='bar')]]
Run Code Online (Sandbox Code Playgroud)

还有一些其他条件可以在没有显式提示的情况下使用广播连接(参见例如Databricks Guide - SQL,DataFrames和Datasets/BroadcastHashJoin),并且无法保证将来不会添加一些其他机制.

如果您想确定结果,请明确重新分区.

c.repartition("join_key").rdd.glom().collect()

## [[],
##  [Row(join_key='b', id=2, foobar='bar'),
##  Row(join_key='b', id=4, foobar='bar')],
##  [Row(join_key='a', id=1, foobar='foo'),
##   Row(join_key='a', id=3, foobar='foo')],
##  []]
Run Code Online (Sandbox Code Playgroud)

这里还有一个问题就是使用DataFrames效率和稳健性.如果你的逻辑严重依赖于直接在Python中访问数据(与SQL表达式相反),使用DataFrames和传递数据几乎就是一种反模式.您可以查看我对Spark函数与UDF性能的答案吗?这涵盖了类似的问题.因此,在采用此方法之前,请务必进行基准测试,因为在许多情况下,移动数据的成本很容易消耗SQL优化的所有好处.

  • 我很高兴你发现它很有用.这是一个有趣的问题.关于RDD vs DataFrame,我没有尝试对此进行基准测试,所以我无法给出明确的答案,但我可能会选择`DataFrame`.由于它是一种专门的机制,即使不是现在也可以更加优化.请记住,`df.partitionBy`不会对`DataFrame`进行分区.要对DF进行分区,您应该在答案中使用`repartition`. (2认同)