Osc*_* C. 3 python apache-spark rdd spark-dataframe
嗨,我在笔记本中有以下代码,并通过托盘进行代码处理python spark:
mydataNoSQL.createOrReplaceTempView("mytable")
spark.sql("SELECT * from mytable")
return mydataNoSQL
def getsameData(df,spark):
result = spark.sql("select * from mytable where temeperature is not null")
return result.rdd.sample(False, 0.1).map(lambda row : (row.temperature))
Run Code Online (Sandbox Code Playgroud)
我需要一个实例RDD,但正在获取类'pyspark.rdd.PipelinedRDD'
任何帮助都会很好。
pyspark.rdd.PipelinedRDD是的子类,RDD并且必须具有RDD中定义的所有API。即。PipelinedRDD只是一种特殊类型,RDD当您在上运行map函数时会创建该类型RDD。
例如,请看下面的代码片段。
>>> rdd = spark.sparkContext.parallelize(range(1,10))
>>> type(rdd)
<class 'pyspark.rdd.RDD'> ## the type is RDD here
>>> rdd = rdd.map(lambda x: x * x)
>>> type(rdd)
<class 'pyspark.rdd.PipelinedRDD'> ## after the map operation the type is changed to pyspark.rdd.PipelinedRDD
Run Code Online (Sandbox Code Playgroud)
因此,您应该只将自己的代码pyspark.rdd.PipelinedRDD视为RDD。
由于Python是动态类型的语言,因此在Python中没有完整的转换支持。强制将您转换pyspark.rdd.PipelinedRDD为正常的RDD,您可以在rdd上收集并并行化
>>> rdd = spark.sparkContext.parallelize(rdd.collect())
>>> type(rdd)
<class 'pyspark.rdd.RDD'>
Run Code Online (Sandbox Code Playgroud)
如果collectRDD MemoryError的数据很大,则可能在RDD上运行。