我有一个数据帧df有VectorUDT指定的列features.如何获取列的元素,比如第一个元素?
我尝试过以下操作
from pyspark.sql.functions import udf
first_elem_udf = udf(lambda row: row.values[0])
df.select(first_elem_udf(df.features)).show()
Run Code Online (Sandbox Code Playgroud)
但是我收到了一个net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict(for numpy.dtype)错误.如果我first_elem_udf = first_elem_udf(lambda row: row.toArray()[0])改为相同的错误.
我也试过,explode()但我得到一个错误,因为它需要一个数组或地图类型.
我认为这应该是一种常见的操作.
dataframe apache-spark apache-spark-sql pyspark apache-spark-ml
我正在努力保存ML Pipeline生产的数千种型号.正如在答复中指出这里,该机型可以保存如下:
import java.io._
def saveModel(name: String, model: PipelineModel) = {
val oos = new ObjectOutputStream(new FileOutputStream(s"/some/path/$name"))
oos.writeObject(model)
oos.close
}
schools.zip(bySchoolArrayModels).foreach{
case (name, model) => saveModel(name, Model)
}
Run Code Online (Sandbox Code Playgroud)
我已经尝试使用s3://some/path/$name,/user/hadoop/some/path/$name因为我希望模型最终保存到亚马逊s3,但它们都失败,并显示无法找到路径的消息.
如何将模型保存到Amazon S3?
我有一个DataFrame看起来如下:
userID, category, frequency
1,cat1,1
1,cat2,3
1,cat9,5
2,cat4,6
2,cat9,2
2,cat10,1
3,cat1,5
3,cat7,16
3,cat8,2
Run Code Online (Sandbox Code Playgroud)
不同类别的数量是10,我想为每个userID类别创建一个特征向量,并用零填充缺少的类别.
所以输出将是这样的:
userID,feature
1,[1,3,0,0,0,0,0,0,5,0]
2,[0,0,0,6,0,0,0,0,2,1]
3,[5,0,0,0,0,0,16,2,0,0]
Run Code Online (Sandbox Code Playgroud)
这只是一个说明性的例子,实际上我有大约200,000个唯一的userID和300个独特的类别.
创建功能的最有效方法是什么DataFrame?
我的目标是构建一个多字符分类器.
我已经构建了一个用于特征提取的管道,它包括一个StringIndexer转换器,用于将每个类名映射到一个标签,该标签将用于分类器训练步骤.
管道安装在训练集上.
测试集必须由拟合的管道处理,以便提取相同的特征向量.
知道我的测试集文件具有与训练集相同的结构.这里可能的情况是在测试集中面对一个看不见的类名,在这种情况下,StringIndexer将无法找到标签,并且将引发异常.
这种情况有解决方案吗?或者我们如何避免这种情况发生?
我想制作libsvm格式,所以我将数据帧设置为所需的格式,但我不知道如何转换为libsvm格式.格式如图所示.我希望所需的libsvm类型是用户项:rating.如果您知道在当前情况下该怎么做:
val ratings = sc.textFile(new File("/user/ubuntu/kang/0829/rawRatings.csv").toString).map { line =>
val fields = line.split(",")
(fields(0).toInt,fields(1).toInt,fields(2).toDouble)
}
val user = ratings.map{ case (user,product,rate) => (user,(product.toInt,rate.toDouble))}
val usergroup = user.groupByKey
val data =usergroup.map{ case(x,iter) => (x,iter.map(_._1).toArray,iter.map(_._2).toArray)}
val data_DF = data.toDF("user","item","rating")
Run Code Online (Sandbox Code Playgroud)

我正在使用Spark 2.0.
libsvm apache-spark apache-spark-sql apache-spark-ml apache-spark-mllib
遵循Spark MLlib指南, 我们可以读到Spark有两个机器学习库:
spark.mllib,建立在RDD之上.spark.ml,建立在Dataframes之上.根据 StackOverflow上的这个和这个问题,Dataframes比RDD更好(和更新),应尽可能使用.
问题是我想使用常见的机器学习算法(例如:Frequent Pattern Mining,Naive Bayes等)和spark.ml(对于数据帧)不提供此类方法,仅spark.mllib(对于RDD)提供此算法.
如果Dataframes比RDD更好并且推荐指南推荐使用spark.ml,为什么不能在该lib中实现常见的机器学习方法?
machine-learning apache-spark pyspark apache-spark-ml apache-spark-mllib
在为要素中的字符串列编制索引时,我对PySpark有一个奇怪的问题.这是我的tmp.csv文件:
x0,x1,x2,x3
asd2s,1e1e,1.1,0
asd2s,1e1e,0.1,0
,1e3e,1.2,0
bd34t,1e1e,5.1,1
asd2s,1e3e,0.2,0
bd34t,1e2e,4.3,1
Run Code Online (Sandbox Code Playgroud)
我在'x0'中有一个缺失值.首先,我正在使用pyspark_csv将csv文件中的功能读入DataFrame:https://github.com/seahboonsiew/pyspark-csv 然后使用StringIndexer索引x0:
import pyspark_csv as pycsv
from pyspark.ml.feature import StringIndexer
sc.addPyFile('pyspark_csv.py')
features = pycsv.csvToDataFrame(sqlCtx, sc.textFile('tmp.csv'))
indexer = StringIndexer(inputCol='x0', outputCol='x0_idx' )
ind = indexer.fit(features).transform(features)
print ind.collect()
Run Code Online (Sandbox Code Playgroud)
当调用''ind.collect()''时,Spark会抛出java.lang.NullPointerException.一切都适用于完整的数据集,例如,对于'x1'.
有没有人知道造成这种情况的原因以及如何解决这个问题?
提前致谢!
谢尔盖
更新:
我使用Spark 1.5.1.确切的错误:
File "/spark/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.py", line 258, in show
print(self._jdf.showString(n))
File "/spark/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/spark/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o444.showString.
: java.lang.NullPointerException
at org.apache.spark.sql.types.Metadata$.org$apache$spark$sql$types$Metadata$$hash(Metadata.scala:208)
at org.apache.spark.sql.types.Metadata$$anonfun$org$apache$spark$sql$types$Metadata$$hash$2.apply(Metadata.scala:196)
at org.apache.spark.sql.types.Metadata$$anonfun$org$apache$spark$sql$types$Metadata$$hash$2.apply(Metadata.scala:196)
... etc
Run Code Online (Sandbox Code Playgroud)
我试图在不读取csv文件的情况下创建相同的DataFrame, …
python apache-spark apache-spark-sql pyspark apache-spark-ml
我有一个带有元组值的RDD(String,SparseVector),我想使用RDD创建一个DataFrame.获取(label:string,features:vector)DataFrame,它是大多数ml算法库所需的Schema.我知道可以这样做,因为 当给定DataFrame的features列时,HashingTF ml Library会输出一个向量.
temp_df = sqlContext.createDataFrame(temp_rdd, StructType([
StructField("label", DoubleType(), False),
StructField("tokens", ArrayType(StringType()), False)
]))
#assumming there is an RDD (double,array(strings))
hashingTF = HashingTF(numFeatures=COMBINATIONS, inputCol="tokens", outputCol="features")
ndf = hashingTF.transform(temp_df)
ndf.printSchema()
#outputs
#root
#|-- label: double (nullable = false)
#|-- tokens: array (nullable = false)
#| |-- element: string (containsNull = true)
#|-- features: vector (nullable = true)
Run Code Online (Sandbox Code Playgroud)
所以我的问题是,我能以某种方式将(String,SparseVector)的RDD转换为(String,vector)的DataFrame.我试着平常,sqlContext.createDataFrame但没有DataType符合我的需求.
df = sqlContext.createDataFrame(rdd,StructType([ …Run Code Online (Sandbox Code Playgroud) apache-spark apache-spark-sql pyspark apache-spark-ml apache-spark-mllib
我在Spark 1.5.1中有一个spark.ml管道,它由一系列变换器和一个k-means估算器组成.我希望能够在安装管道后访问KMeansModel .clusterCenters,但无法弄清楚如何.是否有一个spark.ml相当于sklearn的pipeline.named_steps功能?
我发现这个答案有两个选择.如果我从我的管道中取出k-means模型并单独使用它,那么第一个工作,但这有点挫败了管道的目的.第二个选项不起作用 - 我明白了error: value getModel is not a member of org.apache.spark.ml.PipelineModel.
编辑:示例管道:
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.Pipeline
// create example dataframe
val sentenceData = sqlContext.createDataFrame(Seq(
("Hi I heard about Spark"),
("I wish Java could use case classes"),
("K-means models are neat")
)).toDF("sentence")
// initialize pipeline stages
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("features").setNumFeatures(20)
val kmeans = new KMeans()
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, kmeans)) …Run Code Online (Sandbox Code Playgroud) 我想Estimator在PySpark MLlib中构建一个简单的自定义.我在这里可以编写一个自定义的Transformer,但我不知道如何在一个Estimator.我也不明白是什么@keyword_only以及为什么我需要这么多的二传手和吸气剂.Scikit-learn似乎有一个适用于自定义模型的文档(请参阅此处,但PySpark没有.
示例模型的伪代码:
class NormalDeviation():
def __init__(self, threshold = 3):
def fit(x, y=None):
self.model = {'mean': x.mean(), 'std': x.std()]
def predict(x):
return ((x-self.model['mean']) > self.threshold * self.model['std'])
def decision_function(x): # does ml-lib support this?
Run Code Online (Sandbox Code Playgroud) python apache-spark pyspark apache-spark-ml apache-spark-mllib
apache-spark ×10
apache-spark-ml ×10
pyspark ×5
scala ×3
python ×2
dataframe ×1
java ×1
libsvm ×1
pipeline ×1