使用PySpark在数据框架上应用sklearn训练的模型

Pie*_*rre 7 python scikit-learn apache-spark pyspark

我使用Python训练了一个随机森林算法,并希望将它应用于PySpark的大数据集.

我首先加载了训练有素的sklearn RF模型(使用joblib),将包含这些功能的数据加载到Spark数据帧中,然后添加一个包含预测的列,并使用用户定义的函数:

def predictClass(features):
    return rf.predict(features)
udfFunction = udf(predictClass, StringType())
new_dataframe = dataframe.withColumn('prediction', 
udfFunction('features'))
Run Code Online (Sandbox Code Playgroud)

它需要花费很多时间才能运行,是否有更有效的方法来做同样的事情?(不使用Spark ML)

小智 2

我在最近的项目中必须做同样的事情。为每一行应用 udf 的坏处是 pyspark 每次都必须读取 sklearn 模型,这就是为什么需要很长时间才能完成。我发现的最好的解决方案是在 rdd 上使用 .mapPartitions 或 foreachPartition 方法,这里有很好的解释

https://github.com/mahmoudparsian/pyspark-tutorial/blob/master/tutorial/map-partitions/README.md

它工作得很快,因为它确保您没有洗牌,并且对于每个分区,pyspark 只需要读取模型并预测一次。因此,流程将是:

  • 将 DF 转换为 RDD
  • 将模型广播到节点,以便工作人员可以访问它
  • 编写一个 udf 函数,它将 interator(包含分区内的所有行)作为参数
  • 遍历行并使用您的特征创建适当的矩阵(顺序很重要)
  • 仅调用 .predict 一次
  • 返回预测
  • 如果需要,将 rdd 转换为 df