kra*_*ter 2 apache-spark pyspark
我一直在使用Ipython笔记本测试脚本并将pyspark传递给它。我想要完成的所有事情都运作良好。
我还使用pyspark从命令行在没有笔记本的情况下运行了它,并且可以工作。
使用1.3.1版
使用spark-submit提交为作业时
spark-submit --master local[*] myscript.py
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
x_map = rdd.map(lambda s: (s[1][1],s[1][3])).distinct().toDF().toPandas()
AttributeError: 'PipelinedRDD' object has no attribute 'toDF'
Run Code Online (Sandbox Code Playgroud)
我的脚本的开头如下所示:
from pyspark import SparkContext
sc = SparkContext(appName="Whatever")
from pyspark.sql.types import *
from pyspark.sql import Row
import statsmodels.api as sm
import pandas as pd
import numpy as np
import sys
[..] other python modules
rdd = sc.textFile(input_file)
rdd = rdd.map(lambda line: (line.split(",")[1],[x for x in line.split(",")])).sortByKey()
x_map = rdd.map(lambda s: (s[1][1],s[1][3])).distinct().toDF().toPandas()
Run Code Online (Sandbox Code Playgroud)
您可以在以下链接中阅读:http : //spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html
创建后,SQLContext将一个名为toDF的方法添加到RDD中,该方法可用于将RDD转换为DataFrame,这是SQLContext.createDataFrame()的简写形式。
因此,为了在RDD中使用toDF方法,您需要创建一个sqlContext并使用SparkContext对其进行初始化:
from pyspark.sql import SQLContext
...
sqlContext = SQLContext(sc)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
2427 次 |
最近记录: |