我有一个数据集(user, product, review),并希望将其提供给mllib的ALS算法.
该算法需要用户和产品为数字,而我的是String用户名和字符串SKU.
现在,我获得了不同的用户和SKU,然后在Spark之外为他们分配数字ID.
我想知道是否有更好的方法来做到这一点.我想到的一种方法是编写一个自定义RDD,基本上枚举1到n,然后在两个RDD上调用zip.
我正在使用Spark 2.1.1处理具有~2000个特征的数据集,并尝试创建一个基本的ML管道,包括一些变形金刚和分类器.
让我们假设为了简单起见,我正在使用的Pipeline包含一个VectorAssembler,StringIndexer和一个Classifier,这将是一个相当常见的用例.
// Pipeline elements
val assmbleFeatures: VectorAssembler = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("featuresRaw")
val labelIndexer: StringIndexer = new StringIndexer()
.setInputCol("TARGET")
.setOutputCol("indexedLabel")
// Train a RandomForest model.
val rf: RandomForestClassifier = new RandomForestClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("featuresRaw")
.setMaxBins(30)
// add the params, unique to this classifier
val paramGrid = new ParamGridBuilder()
.addGrid(rf.numTrees, Array(5))
.addGrid(rf.maxDepth, Array(5))
.build()
// Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages.
val evaluator = new BinaryClassificationEvaluator()
.setMetricName("areaUnderROC")
.setLabelCol("indexedLabel")
Run Code Online (Sandbox Code Playgroud)
如果管道步骤被分成变换器管道(VectorAssembler + StringIndexer)和第二个分类器管道,并且如果在两个管道之间删除不必要的列,则训练成功.这意味着重新使用模型,必须在训练后保存两个PipelineModel,并且必须引入中间预处理步骤.
// …Run Code Online (Sandbox Code Playgroud) 如何在使用Python保存在磁盘上的稀疏CSR数组的块上并行应用某些函数?顺序地,这可以例如通过保存CSR阵列并且joblib.dump打开它joblib.load(.., mmap_mode="r")并逐个处理行的块来完成.使用dask可以更有效地完成这项工作吗?
特别是,假设一个人不需要在稀疏数组上完成所有可能的核心操作,而只需要并行加载行块(每个块是一个CSR数组)并对它们应用一些函数(在我的情况下它会例如estimator.predict(X)来自scikit-learn).
此外,磁盘上是否有适合此任务的文件格式?Joblib有效,但我不确定作为内存映射加载的CSR数组的(并行)性能; spark.mllib似乎使用一些自定义稀疏存储格式(似乎没有纯Python解析器)或LIBSVM格式(根据我的经验,scikit-learn中的解析器比它慢得多joblib.dump)...
注意:我在https://github.com/dask/dask/上阅读了文档,有关它的各种问题,但我仍然不确定如何最好地解决这个问题.
编辑:为了给出一个更实际的例子,下面是在密码数组的dask中工作的代码,但在使用带有此错误的稀疏数组时失败,
import numpy as np
import scipy.sparse
import joblib
import dask.array as da
from sklearn.utils import gen_batches
np.random.seed(42)
joblib.dump(np.random.rand(100000, 1000), 'X_dense.pkl')
joblib.dump(scipy.sparse.random(10000, 1000000, format='csr'), 'X_csr.pkl')
fh = joblib.load('X_dense.pkl', mmap_mode='r')
# computing the results without dask
results = np.vstack((fh[sl, :].sum(axis=1)) for sl in gen_batches(fh.shape[0], batch_size))
# computing the results with dask
x = da.from_array(fh, chunks=(2000)) …Run Code Online (Sandbox Code Playgroud) 如何处理与分类数据 spark-ml ,而不是 spark-mllib?
认为文档不是很清楚,似乎分类器例如RandomForestClassifier,LogisticRegression有一个featuresCol参数,它指定了特征列的名称DataFrame,以及一个labelCol参数,它指定了标记类的列的名称DataFrame.
显然我想在我的预测中使用多个功能,所以我尝试使用VectorAssembler将所有功能放在一个向量下featuresCol.
但是,VectorAssembler只接受数字类型,布尔类型和矢量类型(根据Spark网站),所以我不能在我的特征向量中添加字符串.
我该怎么办?
categorical-data apache-spark apache-spark-ml apache-spark-mllib
我原来的问题是为什么使用DecisionTreeModel.predict内部地图功能会引发异常?并且与如何使用MLlib在Spark上生成(原始标签,预测标签)的元组有关?
当我们使用Scala API时,推荐RDD[LabeledPoint]使用预测的方法DecisionTreeModel是简单地映射RDD:
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
Run Code Online (Sandbox Code Playgroud)
遗憾的是,PySpark中的类似方法效果不佳:
labelsAndPredictions = testData.map(
lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()
Run Code Online (Sandbox Code Playgroud)
例外:您似乎尝试从广播变量,操作或转换引用SparkContext.SparkContext只能在驱动程序上使用,而不能在工作程序上运行的代码中使用.有关更多信息,请参阅SPARK-5063.
而不是官方文档推荐这样的东西:
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
Run Code Online (Sandbox Code Playgroud)
那么这里发生了什么?此处没有广播变量,Scala API定义predict如下:
/**
* Predict values for a single data point using the model trained.
*
* @param features array representing …Run Code Online (Sandbox Code Playgroud) 我正在评估基于生产ML的应用程序的工具,我们的一个选项是Spark MLlib,但是我对如何在训练后提供模型服务有一些疑问?
例如,在Azure ML中,一旦经过培训,该模型将作为Web服务公开,可以从任何应用程序中使用,这与Amazon ML类似.
您如何在Apache Spark中提供/部署ML模型?
我试图通过使用Spark ML api运行随机森林分类,但我遇到了将正确的数据帧输入创建到管道中的问题.
以下是示例数据:
age,hours_per_week,education,sex,salaryRange
38,40,"hs-grad","male","A"
28,40,"bachelors","female","A"
52,45,"hs-grad","male","B"
31,50,"masters","female","B"
42,40,"bachelors","male","B"
Run Code Online (Sandbox Code Playgroud)
age和hours_per_week是整数,而其他功能包括label salaryRange是分类(String)
加载这个csv文件(让我们称之为sample.csv)可以通过Spark csv库完成,如下所示:
val data = sqlContext.csvFile("/home/dusan/sample.csv")
Run Code Online (Sandbox Code Playgroud)
默认情况下,所有列都作为字符串导入,因此我们需要将"age"和"hours_per_week"更改为Int:
val toInt = udf[Int, String]( _.toInt)
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))
Run Code Online (Sandbox Code Playgroud)
只是为了检查架构现在的样子:
scala> dataFixed.printSchema
root
|-- age: integer (nullable = true)
|-- hours_per_week: integer (nullable = true)
|-- education: string (nullable = true)
|-- sex: string (nullable = true)
|-- salaryRange: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
然后设置交叉验证器和管道:
val rf = new RandomForestClassifier()
val pipeline …Run Code Online (Sandbox Code Playgroud) 我注意到LinearRegressionModelSparkML中有两个类,一个在ML中,另一个在MLLib包中.
这两个实现方式完全不同 - 例如,一个来自MLLib工具Serializable,而另一个则没有.
顺便说一句,Ame是真实的RandomForestModel.
为什么有两节课?哪个是"正确的"?有没有办法将一个转换成另一个?
我想使用下面的代码转换spark数据框:
from pyspark.mllib.clustering import KMeans
spark_df = sqlContext.createDataFrame(pandas_df)
rdd = spark_df.map(lambda data: Vectors.dense([float(c) for c in data]))
model = KMeans.train(rdd, 2, maxIterations=10, runs=30, initializationMode="random")
Run Code Online (Sandbox Code Playgroud)
详细的错误消息是:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-11-a19a1763d3ac> in <module>()
1 from pyspark.mllib.clustering import KMeans
2 spark_df = sqlContext.createDataFrame(pandas_df)
----> 3 rdd = spark_df.map(lambda data: Vectors.dense([float(c) for c in data]))
4 model = KMeans.train(rdd, 2, maxIterations=10, runs=30, initializationMode="random")
/home/edamame/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in __getattr__(self, name)
842 if name not in self.columns:
843 raise AttributeError(
--> 844 "'%s' object …Run Code Online (Sandbox Code Playgroud) python apache-spark pyspark spark-dataframe apache-spark-mllib