标签: apache-spark-ml

如何访问Spark DataFrame中VectorUDT列的元素?

我有一个数据帧dfVectorUDT指定的列features.如何获取列的元素,比如第一个元素?

我尝试过以下操作

from pyspark.sql.functions import udf
first_elem_udf = udf(lambda row: row.values[0])
df.select(first_elem_udf(df.features)).show()
Run Code Online (Sandbox Code Playgroud)

但是我收到了一个net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict(for numpy.dtype)错误.如果我first_elem_udf = first_elem_udf(lambda row: row.toArray()[0])改为相同的错误.

我也试过,explode()但我得到一个错误,因为它需要一个数组或地图类型.

我认为这应该是一种常见的操作.

dataframe apache-spark apache-spark-sql pyspark apache-spark-ml

16
推荐指数
1
解决办法
5886
查看次数

如何将模型从ML Pipeline保存到S3或HDFS?

我正在努力保存ML Pipeline生产的数千种型号.正如在答复中指出这里,该机型可以保存如下:

import java.io._

def saveModel(name: String, model: PipelineModel) = {
  val oos = new ObjectOutputStream(new FileOutputStream(s"/some/path/$name"))
  oos.writeObject(model)
  oos.close
}

schools.zip(bySchoolArrayModels).foreach{
  case (name, model) => saveModel(name, Model)
}
Run Code Online (Sandbox Code Playgroud)

我已经尝试使用s3://some/path/$name,/user/hadoop/some/path/$name因为我希望模型最终保存到亚马逊s3,但它们都失败,并显示无法找到路径的消息.

如何将模型保存到Amazon S3?

java scala apache-spark apache-spark-ml apache-spark-mllib

15
推荐指数
2
解决办法
9289
查看次数

Spark,Scala,DataFrame:创建特征向量

我有一个DataFrame看起来如下:

userID, category, frequency
1,cat1,1
1,cat2,3
1,cat9,5
2,cat4,6
2,cat9,2
2,cat10,1
3,cat1,5
3,cat7,16
3,cat8,2
Run Code Online (Sandbox Code Playgroud)

不同类别的数量是10,我想为每个userID类别创建一个特征向量,并用零填充缺少的类别.

所以输出将是这样的:

userID,feature
1,[1,3,0,0,0,0,0,0,5,0]
2,[0,0,0,6,0,0,0,0,2,1]
3,[5,0,0,0,0,0,16,2,0,0]
Run Code Online (Sandbox Code Playgroud)

这只是一个说明性的例子,实际上我有大约200,000个唯一的userID和300个独特的类别.

创建功能的最有效方法是什么DataFrame

scala apache-spark apache-spark-sql apache-spark-ml

15
推荐指数
3
解决办法
9629
查看次数

Spark,ML,StringIndexer:处理看不见的标签

我的目标是构建一个多字符分类器.

我已经构建了一个用于特征提取的管道,它包括一个StringIndexer转换器,用于将每个类名映射到一个标签,该标签将用于分类器训练步骤.

管道安装在训练集上.

测试集必须由拟合的管道处理,以便提取相同的特征向量.

知道我的测试集文件具有与训练集相同的结构.这里可能的情况是在测试集中面对一个看不见的类名,在这种情况下,StringIndexer将无法找到标签,并且将引发异常.

这种情况有解决方案吗?或者我们如何避免这种情况发生?

apache-spark apache-spark-ml

15
推荐指数
3
解决办法
2万
查看次数

如何从DataFrame准备数据到LibSVM格式?

我想制作libsvm格式,所以我将数据帧设置为所需的格式,但我不知道如何转换为libsvm格式.格式如图所示.我希望所需的libsvm类型是用户项:rating.如果您知道在当前情况下该怎么做:

val ratings = sc.textFile(new File("/user/ubuntu/kang/0829/rawRatings.csv").toString).map { line =>
     val fields = line.split(",")
      (fields(0).toInt,fields(1).toInt,fields(2).toDouble)
}
val user = ratings.map{ case (user,product,rate) => (user,(product.toInt,rate.toDouble))}
val usergroup = user.groupByKey 

val data =usergroup.map{ case(x,iter) => (x,iter.map(_._1).toArray,iter.map(_._2).toArray)}

val data_DF = data.toDF("user","item","rating")
Run Code Online (Sandbox Code Playgroud)

数据框图

我正在使用Spark 2.0.

libsvm apache-spark apache-spark-sql apache-spark-ml apache-spark-mllib

15
推荐指数
1
解决办法
2万
查看次数

为什么spark.ml不实现任何spark.mllib算法?

遵循Spark MLlib指南, 我们可以读到Spark有两个机器学习库:

  • spark.mllib,建立在RDD之上.
  • spark.ml,建立在Dataframes之上.

根据 StackOverflow上的这个这个问题,Dataframes比RDD更好(和更新),应尽可能使用.

问题是我想使用常见的机器学习算法(例如:Frequent Pattern Mining,Naive Bayes等)和spark.ml(对于数据帧)不提供此类方法,仅spark.mllib(对于RDD)提供此算法.

如果Dataframes比RDD更好并且推荐指南推荐使用spark.ml,为什么不能在该lib中实现常见的机器学习方法?

这里遗漏的是什么?

machine-learning apache-spark pyspark apache-spark-ml apache-spark-mllib

14
推荐指数
1
解决办法
3104
查看次数

遇到缺失功能时,Apache Spark会抛出NullPointerException

在为要素中的字符串列编制索引时,我对PySpark有一个奇怪的问题.这是我的tmp.csv文件:

x0,x1,x2,x3 
asd2s,1e1e,1.1,0
asd2s,1e1e,0.1,0
,1e3e,1.2,0
bd34t,1e1e,5.1,1
asd2s,1e3e,0.2,0
bd34t,1e2e,4.3,1
Run Code Online (Sandbox Code Playgroud)

我在'x0'中有一个缺失值.首先,我正在使用pyspark_csv将csv文件中的功能读入DataFrame:https://github.com/seahboonsiew/pyspark-csv 然后使用StringIndexer索引x0:

import pyspark_csv as pycsv
from pyspark.ml.feature import StringIndexer

sc.addPyFile('pyspark_csv.py')

features = pycsv.csvToDataFrame(sqlCtx, sc.textFile('tmp.csv'))
indexer = StringIndexer(inputCol='x0', outputCol='x0_idx' )
ind = indexer.fit(features).transform(features)
print ind.collect()
Run Code Online (Sandbox Code Playgroud)

当调用''ind.collect()''时,Spark会抛出java.lang.NullPointerException.一切都适用于完整的数据集,例如,对于'x1'.

有没有人知道造成这种情况的原因以及如何解决这个问题?

提前致谢!

谢尔盖

更新:

我使用Spark 1.5.1.确切的错误:

File "/spark/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.py", line 258, in show
print(self._jdf.showString(n))

File "/spark/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__

File "/spark/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling o444.showString.
: java.lang.NullPointerException
at org.apache.spark.sql.types.Metadata$.org$apache$spark$sql$types$Metadata$$hash(Metadata.scala:208)
at org.apache.spark.sql.types.Metadata$$anonfun$org$apache$spark$sql$types$Metadata$$hash$2.apply(Metadata.scala:196)
at org.apache.spark.sql.types.Metadata$$anonfun$org$apache$spark$sql$types$Metadata$$hash$2.apply(Metadata.scala:196)
... etc
Run Code Online (Sandbox Code Playgroud)

我试图在不读取csv文件的情况下创建相同的DataFrame, …

python apache-spark apache-spark-sql pyspark apache-spark-ml

14
推荐指数
1
解决办法
8325
查看次数

如何将具有SparseVector列的RDD转换为具有列为Vector的DataFrame

我有一个带有元组值的RDD(String,SparseVector),我想使用RDD创建一个DataFrame.获取(label:string,features:vector)DataFrame,它是大多数ml算法库所需的Schema.我知道可以这样做,因为 当给定DataFrame的features列时,HashingTF ml Library会输出一个向量.

temp_df = sqlContext.createDataFrame(temp_rdd, StructType([
        StructField("label", DoubleType(), False),
        StructField("tokens", ArrayType(StringType()), False)
    ]))

#assumming there is an RDD (double,array(strings))

hashingTF = HashingTF(numFeatures=COMBINATIONS, inputCol="tokens", outputCol="features")

ndf = hashingTF.transform(temp_df)
ndf.printSchema()

#outputs 
#root
#|-- label: double (nullable = false)
#|-- tokens: array (nullable = false)
#|    |-- element: string (containsNull = true)
#|-- features: vector (nullable = true)
Run Code Online (Sandbox Code Playgroud)

所以我的问题是,我能以某种方式将(String,SparseVector)的RDD转换为(String,vector)的DataFrame.我试着平常,sqlContext.createDataFrame但没有DataType符合我的需求.

df = sqlContext.createDataFrame(rdd,StructType([ …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark apache-spark-ml apache-spark-mllib

13
推荐指数
1
解决办法
8073
查看次数

是否可以访问spark.ml管道中的估算器属性?

我在Spark 1.5.1中有一个spark.ml管道,它由一系列变换器和一个k-means估算器组成.我希望能够在安装管道后访问KMeansModel .clusterCenters,但无法弄清楚如何.是否有一个spark.ml相当于sklearn的pipeline.named_steps功能?

我发现这个答案有两个选择.如果我从我的管道中取出k-means模型并单独使用它,那么第一个工作,但这有点挫败了管道的目的.第二个选项不起作用 - 我明白了error: value getModel is not a member of org.apache.spark.ml.PipelineModel.

编辑:示例管道:

import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.Pipeline

// create example dataframe
val sentenceData = sqlContext.createDataFrame(Seq(
  ("Hi I heard about Spark"),
  ("I wish Java could use case classes"),
  ("K-means models are neat")
  )).toDF("sentence")

// initialize pipeline stages
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("features").setNumFeatures(20)
val kmeans = new KMeans()
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, kmeans)) …
Run Code Online (Sandbox Code Playgroud)

pipeline scala apache-spark apache-spark-ml

13
推荐指数
1
解决办法
2041
查看次数

如何在PySpark mllib中滚动自定义估算器

我想Estimator在PySpark MLlib中构建一个简单的自定义.我在这里可以编写一个自定义的Transformer,但我不知道如何在一个Estimator.我也不明白是什么@keyword_only以及为什么我需要这么多的二传手和吸气剂.Scikit-learn似乎有一个适用于自定义模型的文档(请参阅此处,但PySpark没有.

示例模型的伪代码:

class NormalDeviation():
    def __init__(self, threshold = 3):
    def fit(x, y=None):
       self.model = {'mean': x.mean(), 'std': x.std()]
    def predict(x):
       return ((x-self.model['mean']) > self.threshold * self.model['std'])
    def decision_function(x): # does ml-lib support this?
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark apache-spark-ml apache-spark-mllib

13
推荐指数
1
解决办法
4313
查看次数