需要RDD的实例,但返回类'pyspark.rdd.PipelinedRDD'

Osc*_* C. 3 python apache-spark rdd spark-dataframe

嗨,我在笔记本中有以下代码,并通过托盘进行代码处理python spark:

 mydataNoSQL.createOrReplaceTempView("mytable")
 spark.sql("SELECT * from mytable")
 return mydataNoSQL

def getsameData(df,spark):
result = spark.sql("select * from mytable where temeperature is not null")
return result.rdd.sample(False, 0.1).map(lambda row : (row.temperature))
Run Code Online (Sandbox Code Playgroud)

我需要一个实例RDD,但正在获取类'pyspark.rdd.PipelinedRDD'

任何帮助都会很好。

rog*_*one 5

pyspark.rdd.PipelinedRDD是的子类,RDD并且必须具有RDD中定义的所有API。即。PipelinedRDD只是一种特殊类型,RDD当您在上运行map函数时会创建该类型RDD

例如,请看下面的代码片段。

>>> rdd = spark.sparkContext.parallelize(range(1,10))
>>> type(rdd)
<class 'pyspark.rdd.RDD'> ## the type is RDD here
>>> rdd = rdd.map(lambda x: x * x)
>>> type(rdd)
<class 'pyspark.rdd.PipelinedRDD'> ## after the map operation the type is changed to pyspark.rdd.PipelinedRDD
Run Code Online (Sandbox Code Playgroud)

因此,您应该只将自己的代码pyspark.rdd.PipelinedRDD视为RDD

由于Python是动态类型的语言,因此在Python中没有完整的转换支持。强制将您转换pyspark.rdd.PipelinedRDD为正常的RDD,您可以在rdd上收集并并行化

>>> rdd = spark.sparkContext.parallelize(rdd.collect())
>>> type(rdd)
<class 'pyspark.rdd.RDD'>
Run Code Online (Sandbox Code Playgroud)

如果collectRDD MemoryError的数据很大,则可能在RDD上运行。