小编Fre*_*ira的帖子

'PipelinedRDD'对象在PySpark中没有属性'toDF'

我正在尝试加载SVM文件并将其转换为一个,DataFrame因此我可以使用PipelineSpark 的ML模块(ML).我刚刚在Ubuntu 14.04上安装了一个新的Spark 1.5.0(没有spark-env.sh配置).

my_script.py是:

from pyspark.mllib.util import MLUtils
from pyspark import SparkContext

sc = SparkContext("local", "Teste Original")
data = MLUtils.loadLibSVMFile(sc, "/home/svm_capture").toDF()
Run Code Online (Sandbox Code Playgroud)

我正在使用: ./spark-submit my_script.py

我收到错误:

Traceback (most recent call last):
File "/home/fred-spark/spark-1.5.0-bin-hadoop2.6/pipeline_teste_original.py", line 34, in <module>
data = MLUtils.loadLibSVMFile(sc, "/home/fred-spark/svm_capture").toDF()
AttributeError: 'PipelinedRDD' object has no attribute 'toDF'
Run Code Online (Sandbox Code Playgroud)

我无法理解的是,如果我跑:

data = MLUtils.loadLibSVMFile(sc, "/home/svm_capture").toDF()
Run Code Online (Sandbox Code Playgroud)

直接在PySpark shell中,它的工作原理.

python apache-spark rdd apache-spark-sql pyspark

45
推荐指数
1
解决办法
4万
查看次数

Spark 2.0读取csv分区数(PySpark)

我正在尝试使用Spark 2.0中的新东西将一些代码从Spark 1.6移植到Spark 2.0.首先,我想使用Spark 2.0的csv阅读器.顺便说一下,我正在使用pyspark.

使用"旧" textFile功能,我可以设置最小分区数.例如:

file= sc.textFile('/home/xpto/text.csv', minPartitions=10)
header = file.first() #extract header
data = file.filter(lambda x:x !=header) #csv without header
...
Run Code Online (Sandbox Code Playgroud)

现在,使用Spark 2.0,我可以直接读取csv:

df = spark.read.csv('/home/xpto/text.csv', header=True)
...
Run Code Online (Sandbox Code Playgroud)

但我没有找到一种方法来设置minPartitions.

我需要这个来测试我的代码的性能.

谢谢,弗雷德

csv apache-spark pyspark

5
推荐指数
1
解决办法
1万
查看次数

标签 统计

apache-spark ×2

pyspark ×2

apache-spark-sql ×1

csv ×1

python ×1

rdd ×1