Pyspark 申请 foreach

And*_*gel 3 apache-spark spark-streaming pyspark

我是 Pyspark 的菜鸟,我假装玩了几个函数来更好地理解如何在更现实的场景中使用它们。有一段时间,我尝试将特定函数应用于 RDD 中的每个数字。我的问题基本上是,当我尝试打印从 RDD 中抓取的内容时,结果是 None

我的代码:

from pyspark import SparkConf , SparkContext

conf = SparkConf().setAppName('test')
sc = SparkContext(conf=conf)

sc.setLogLevel("WARN")


changed = []

def div_two (n):
    opera = n / 2
    return opera

numbers = [8,40,20,30,60,90]

numbersRDD = sc.parallelize(numbers)

changed.append(numbersRDD.foreach(lambda x: div_two(x)))

#result = numbersRDD.map(lambda x: div_two(x))

for i in changed:
    print(i) 
Run Code Online (Sandbox Code Playgroud)

我很欣赏关于为什么这会在列表中出现 Null 以及使用 foreach 实现这一目标的正确方法(无论是否可能)的明确解释。

谢谢

Ram*_*jan 5

您的函数定义div_two似乎很好,但还可以简化为

def div_two (n):
    return n/2
Run Code Online (Sandbox Code Playgroud)

你已经将整数数组转换为rdd,这也很好。

主要问题是您试图将rdds添加到使用function更改的数组中foreach。但是如果你看一下定义foreach

def foreach(self, f) Inferred type: (self: RDD, f: Any) -> None
Run Code Online (Sandbox Code Playgroud)

这表示返回类型是 None. 这就是正在打印的内容。

你不需要打印改变的一个数组变量元素的的RDD。您可以简单地编写一个用于打印的函数并在foreach函数中调用该函数

def printing(x):
    print x

numbersRDD.map(div_two).foreach(printing)
Run Code Online (Sandbox Code Playgroud)

您应该打印结果。

您仍然可以将rdd加到一个array变量中,但rdds它本身是分布式集合,Array也是一个集合。所以,如果你添加RDD到一个数组中,你将有收集收集,这意味着你应该写两个循环

changed.append(numbersRDD.map(div_two))

def printing(x):
    print x

for i in changed:
    i.foreach(printing)
Run Code Online (Sandbox Code Playgroud)

您的代码和我的代码之间的主要区别在于,我在将 rdd 添加到 changed variable 时使用了 map (这是一种转换)而不是 foreach (这是一种操作)。我使用了两个循环来打印元素rdd