pyspark通过特定键加入rdds

dag*_*g3r 3 join rdd pyspark

我有两个rdds,需要将它们一起加入。它们如下所示:

RDD1

[(u'2', u'100', 2),
 (u'1', u'300', 1),
 (u'1', u'200', 1)]
Run Code Online (Sandbox Code Playgroud)

RDD2

[(u'1', u'2'), (u'1', u'3')]
Run Code Online (Sandbox Code Playgroud)

我想要的输出是:

[(u'1', u'2', u'100', 2)]
Run Code Online (Sandbox Code Playgroud)

因此,我想从RDD2中选择具有相同RDD1第二个值的那些。我尝试了加入,也尝试了笛卡尔,但没有一个正在工作,甚至没有接近我想要的东西。我是Spark的新手,非常感谢你们的任何帮助。

谢谢

Rak*_*mar 5

对我来说,您的过程看起来像手动的。这是示例代码:-

rdd = sc.parallelize([(u'2', u'100', 2),(u'1', u'300', 1),(u'1', u'200', 1)])
rdd1 = sc.parallelize([(u'1', u'2'), (u'1', u'3')])
newRdd = rdd1.map(lambda x:(x[1],x[0])).join(rdd.map(lambda x:(x[0],(x[1],x[2]))))
newRdd.map(lambda x:(x[1][0], x[0], x[1][1][0], x[1][1][1])).coalesce(1).collect()
Run Code Online (Sandbox Code Playgroud)

输出:-

[(u'1', u'2', u'100', 2)]
Run Code Online (Sandbox Code Playgroud)


tit*_*ata 5

数据框如果您允许在解决方案中使用Spark数据框。您可以将给定的RDD转换为数据框,并将相应的列连接在一起。

df1 = spark.createDataFrame(rdd1, schema=['a', 'b', 'c'])
df2 = spark.createDataFrame(rdd2, schema=['d', 'a'])
rdd_join = df1.join(df2, on='a')
out = rdd_join.rdd.collect()
Run Code Online (Sandbox Code Playgroud)

RDD只需将您想加入的键压缩到第一个元素,并简单地用于join进行结合

rdd1_zip = rdd1.map(lambda x: (x[0], (x[1], x[2])))
rdd2_zip = rdd2.map(lambda x: (x[1], x[0]))
rdd_join = rdd1_zip.join(rdd2_zip)
rdd_out = rdd_join.map(lambda x: (x[0], x[1][0][0], x[1][0][1], x[1][1])).collect() # flatten the rdd
print(rdd_out)
Run Code Online (Sandbox Code Playgroud)