如何从pyspark.rdd.PipelinedRDD筛选出值?

Din*_*ius 2 python apache-spark rdd pyspark

我有一个pyspark.rdd.PipelinedRDD电话myRDD。这是其示例内容:

[((111, u'BB', u'A'), (444, u'BB', u'A')),
 ((222, u'BB', u'A'), (888, u'BB', u'A')),
 ((333, u'BB', u'B'), (999, u'BB', u'A')),...]
Run Code Online (Sandbox Code Playgroud)

我需要删除第三列值不一致的所有条目。预期的结果是这样的:

[((111, u'BB', u'A'), (444, u'BB', u'A')),
 ((222, u'BB', u'A'), (888, u'BB', u'A')),...]
Run Code Online (Sandbox Code Playgroud)

我该怎么做?

Var*_*uri 5

您可以使用带有lambda表达式的filter来检查每个元组对的第三个元素是否相同,例如:

l = [((111, u'BB', u'A'), (444, u'BB', u'A')),
     ((222, u'BB', u'A'), (888, u'BB', u'A')),
     ((333, u'BB', u'B'), (999, u'BB', u'A'))]

rdd = sc.parallelize(l)
rdd = rdd.filter(lambda x: x[0][2] == x[1][2])
result = rdd.collect()
print result

>>> [((111, u'BB', u'A'), (444, u'BB', u'A')), ((222, u'BB', u'A'), (888, u'BB', u'A'))]
Run Code Online (Sandbox Code Playgroud)

要回答您的后续评论,请记住,lambda只是一个函数,如果您具有更复杂的逻辑,则可以将其写为一个函数。您可以执行以下操作:

def do_stuff(x):
    if (x[0][2] == 'C') or (x[1][2] == 'C'):
        return x     
    else:
        if x[0][2] == x[1][2]: return x
    return None

rdd = rdd.map(do_stuff).filter(lambda x: x is not None)

res = rdd.collect()
Run Code Online (Sandbox Code Playgroud)