多个RDD的Spark联合

use*_*714 35 python apache-spark rdd pyspark

在我的猪代码中,我这样做:

all_combined = Union relation1, relation2, 
    relation3, relation4, relation5, relation 6.
Run Code Online (Sandbox Code Playgroud)

我想用火花做同样的事情.然而,不幸的是,我发现我必须继续这样做:

first = rdd1.union(rdd2)
second = first.union(rdd3)
third = second.union(rdd4)
# .... and so on
Run Code Online (Sandbox Code Playgroud)

是否有一个联合运算符可以让我一次操作多个rdds:

例如 union(rdd1, rdd2,rdd3, rdd4, rdd5, rdd6)

这是一个方便的问题.

zer*_*323 79

如果这些是RDD,您可以使用SparkContext.union方法:

rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([4, 5, 6])
rdd3 = sc.parallelize([7, 8, 9])

rdd = sc.union([rdd1, rdd2, rdd3])
rdd.collect()

## [1, 2, 3, 4, 5, 6, 7, 8, 9]
Run Code Online (Sandbox Code Playgroud)

没有DataFrame等价物,但它只是一个简单的单线程问题:

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

df1 = sqlContext.createDataFrame([(1, "foo1"), (2, "bar1")], ("k", "v"))
df2 = sqlContext.createDataFrame([(3, "foo2"), (4, "bar2")], ("k", "v"))
df3 = sqlContext.createDataFrame([(5, "foo3"), (6, "bar3")], ("k", "v"))

unionAll(df1, df2, df3).show()

## +---+----+
## |  k|   v|
## +---+----+
## |  1|foo1|
## |  2|bar1|
## |  3|foo2|
## |  4|bar2|
## |  5|foo3|
## |  6|bar3|
## +---+----+
Run Code Online (Sandbox Code Playgroud)

如果DataFrames使用SparkContext.unionRDD的数量很大并且重新创建DataFrame可能是避免与准备执行计划的成本相关的问题的更好选择:

def unionAll(*dfs):
    first, *_ = dfs  # Python 3.x, for 2.x you'll have to unpack manually
    return first.sql_ctx.createDataFrame(
        first.sql_ctx._sc.union([df.rdd for df in dfs]),
        first.schema
    )
Run Code Online (Sandbox Code Playgroud)

  • @drkostas 可能不是最好的方法,但我通过保存 RDD 然后加载它并继续循环来解决这个问题。这会杀死 RDD 的历史记录,你会减慢速度,因为它会为每个新循环重新运行 RDD 历史记录中的每个循环。Spark 不喜欢循环 (3认同)
  • 我想在单行 DF 之间执行大约 3000 个并集。使用第一个选项,在第 100 次迭代后它会以指数方式变慢(我正在使用 tqdm 进行测试)。使用第二个选项,它从一开始就非常慢,并且不断线性减速。有更好的方法吗? (2认同)
  • @Gramatik是的,我也用同样的方式解决了。通过使用选项“append”将每个数据帧保存在镶木地板中,然后将镶木地板加载到新的数据帧中。 (2认同)