我正在尝试加载SVM文件并将其转换为一个,DataFrame因此我可以使用PipelineSpark 的ML模块(ML).我刚刚在Ubuntu 14.04上安装了一个新的Spark 1.5.0(没有spark-env.sh配置).
我my_script.py是:
from pyspark.mllib.util import MLUtils
from pyspark import SparkContext
sc = SparkContext("local", "Teste Original")
data = MLUtils.loadLibSVMFile(sc, "/home/svm_capture").toDF()
Run Code Online (Sandbox Code Playgroud)
我正在使用: ./spark-submit my_script.py
我收到错误:
Traceback (most recent call last):
File "/home/fred-spark/spark-1.5.0-bin-hadoop2.6/pipeline_teste_original.py", line 34, in <module>
data = MLUtils.loadLibSVMFile(sc, "/home/fred-spark/svm_capture").toDF()
AttributeError: 'PipelinedRDD' object has no attribute 'toDF'
Run Code Online (Sandbox Code Playgroud)
我无法理解的是,如果我跑:
data = MLUtils.loadLibSVMFile(sc, "/home/svm_capture").toDF()
Run Code Online (Sandbox Code Playgroud)
直接在PySpark shell中,它的工作原理.
我正在尝试使用Spark 2.0中的新东西将一些代码从Spark 1.6移植到Spark 2.0.首先,我想使用Spark 2.0的csv阅读器.顺便说一下,我正在使用pyspark.
使用"旧" textFile功能,我可以设置最小分区数.例如:
file= sc.textFile('/home/xpto/text.csv', minPartitions=10)
header = file.first() #extract header
data = file.filter(lambda x:x !=header) #csv without header
...
Run Code Online (Sandbox Code Playgroud)
现在,使用Spark 2.0,我可以直接读取csv:
df = spark.read.csv('/home/xpto/text.csv', header=True)
...
Run Code Online (Sandbox Code Playgroud)
但我没有找到一种方法来设置minPartitions.
我需要这个来测试我的代码的性能.
谢谢,弗雷德