我原来的问题是为什么使用DecisionTreeModel.predict内部地图功能会引发异常?并且与如何使用MLlib在Spark上生成(原始标签,预测标签)的元组有关?
当我们使用Scala API时,推荐RDD[LabeledPoint]使用预测的方法DecisionTreeModel是简单地映射RDD:
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
Run Code Online (Sandbox Code Playgroud)
遗憾的是,PySpark中的类似方法效果不佳:
labelsAndPredictions = testData.map(
lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()
Run Code Online (Sandbox Code Playgroud)
例外:您似乎尝试从广播变量,操作或转换引用SparkContext.SparkContext只能在驱动程序上使用,而不能在工作程序上运行的代码中使用.有关更多信息,请参阅SPARK-5063.
而不是官方文档推荐这样的东西:
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
Run Code Online (Sandbox Code Playgroud)
那么这里发生了什么?此处没有广播变量,Scala API定义predict如下:
/**
* Predict values for a single data point using the model trained.
*
* @param features array representing …Run Code Online (Sandbox Code Playgroud) 我有一个Dataframe,我试图压扁.作为整个过程的一部分,我想爆炸它,所以如果我有一列数组,那么数组的每个值都将用于创建一个单独的行.例如,
id | name | likes
_______________________________
1 | Luke | [baseball, soccer]
Run Code Online (Sandbox Code Playgroud)
应该成为
id | name | likes
_______________________________
1 | Luke | baseball
1 | Luke | soccer
Run Code Online (Sandbox Code Playgroud)
这是我的代码
private DataFrame explodeDataFrame(DataFrame df) {
DataFrame resultDf = df;
for (StructField field : df.schema().fields()) {
if (field.dataType() instanceof ArrayType) {
resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name())));
resultDf.show();
}
}
return resultDf;
}
Run Code Online (Sandbox Code Playgroud)
问题是在我的数据中,一些数组列有空值.在这种情况下,整个行都将被删除.所以这个数据帧:
id | name | likes
_______________________________
1 | Luke | [baseball, soccer]
2 | Lucy | null
Run Code Online (Sandbox Code Playgroud)
变 …
比方说,我的团队选择Python作为Spark开发的参考语言.但是后来由于性能原因,我们希望开发特定的Scala或Java特定的库,以便使用我们的Python代码(类似于使用Scala或Java骨架的Python存根)进行映射.
难道您不认为是否可以将新的自定义Python方法与一些Scala或Java用户定义函数联系起来?