inv*_*ell 16 python join apache-spark rdd pyspark
你将如何使用python在Spark中执行基本连接?在R中你可以使用merg()来做到这一点.使用python on spark的语法是什么:
使用两个表(RDD),每个表中都有一个具有公共密钥的列.
RDD(1):(key,U)
RDD(2):(key,V)
Run Code Online (Sandbox Code Playgroud)
我认为内部联接是这样的:
rdd1.join(rdd2).map(case (key, u, v) => (key, ls ++ rs));
Run Code Online (Sandbox Code Playgroud)
是对的吗?我在互联网上搜索过,无法找到一个很好的连接示例.提前致谢.
zer*_*323 35
它可以使用PairRDDFunctions或Spark数据帧完成.由于数据帧操作受益于Catalyst Optimizer,因此第二种选择值得考虑.
假设您的数据如下所示:
rdd1 = sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])
rdd2 = sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6)])
Run Code Online (Sandbox Code Playgroud)
内部联接:
rdd1.join(rdd2)
Run Code Online (Sandbox Code Playgroud)
左外连接:
rdd1.leftOuterJoin(rdd2)
Run Code Online (Sandbox Code Playgroud)
笛卡儿积(不需要RDD[(T, U)]):
rdd1.cartesian(rdd2)
Run Code Online (Sandbox Code Playgroud)
广播加入(不需要RDD[(T, U)]):
最后,cogroup它没有直接的SQL等价物,但在某些情况下可能很有用:
cogrouped = rdd1.cogroup(rdd2)
cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
## [('foo', ([1], [4])), ('bar', ([2], [5, 6])), ('baz', ([3], []))]
Run Code Online (Sandbox Code Playgroud)
您可以使用SQL DSL或执行原始SQL sqlContext.sql.
df1 = spark.createDataFrame(rdd1, ('k', 'v1'))
df2 = spark.createDataFrame(rdd2, ('k', 'v2'))
# Register temporary tables to be able to use sqlContext.sql
df1.createTempView('df1')
df2.createTempView('df2')
Run Code Online (Sandbox Code Playgroud)
内部联接:
# inner is a default value so it could be omitted
df1.join(df2, df1.k == df2.k, how='inner')
spark.sql('SELECT * FROM df1 JOIN df2 ON df1.k = df2.k')
Run Code Online (Sandbox Code Playgroud)
左外连接:
df1.join(df2, df1.k == df2.k, how='left_outer')
spark.sql('SELECT * FROM df1 LEFT OUTER JOIN df2 ON df1.k = df2.k')
Run Code Online (Sandbox Code Playgroud)
交叉连接(Spark.2.0中需要显式交叉连接或配置更改 - Spark 2.x的spark.sql.crossJoin.enabled):
df1.crossJoin(df2)
spark.sql('SELECT * FROM df1 CROSS JOIN df2')
Run Code Online (Sandbox Code Playgroud)
df1.join(df2)
sqlContext.sql('SELECT * FROM df JOIN df2')
Run Code Online (Sandbox Code Playgroud)
由于1.6(Scala中为1.5),每个都可以与broadcast功能组合:
from pyspark.sql.functions import broadcast
df1.join(broadcast(df2), df1.k == df2.k)
Run Code Online (Sandbox Code Playgroud)
执行广播加入.另请参见为什么我的BroadcastHashJoin比Spark中的ShuffledHashJoin慢
| 归档时间: |
|
| 查看次数: |
43545 次 |
| 最近记录: |