为什么spark.ml CrossValidator给出“广播大小为X的大型任务二进制文件”和大数据集?

Jul*_*les 6 machine-learning apache-spark pyspark apache-spark-ml

问题:

\n

我\xe2\x80\x99一直致力于使用Pyspark和Spark ml库分发交叉验证过程,因此它比常规顺序计算(即scikit)需要更少的时间。但是,我\xe2\x80\x99m 在执行此操作时遇到了一些问题。具体来说,当我开始工作时,我不断收到消息 \xe2\x80\x9c Broadcasting large task binary with size X \xe2\x80\x9d (X 是从 1700 KiB 到 6 MiB 的数字)。在我离开作业一段时间后,它最终以消息 \xe2\x80\x9c作业 X 取消,因为 SparkContext 已关闭\xe2\x80\x9d (对于很多 Xs = 作业)和 \xe2\x80\结束x9c错误 TransportRequestHandler:为单向消息调用 RpcHandler#receive() 时出错。org.apache.spark.SparkException:找不到 CoarseGrainedScheduler \xe2\x80\x9d。

\n

推理:

\n

由于我\xe2\x80\x99ve必须修改\xe2\x80\x9c pyspark.ml.tuning#CrossValidator \xe2\x80\x9d中CrossValidator _fit方法的源代码,所以我\xe2\x80\x99m足够熟悉它如何运作才能知道它分配任务的方式是通过对数据集的每个分割并行化具有不同参数设置的模型训练。也就是说,CrossValidator _fit 每次都会将整个数据集发送给执行器,以便在每个执行器中单独训练具有特定参数组合的模型,并且 Spark 似乎不太喜欢广播数据集。这是 pyspark.ml.tunning _fit 方法的相关部分:

\n
        for i in range(nFolds):\n        validation = datasets[i][1].cache()\n        train = datasets[i][0].cache()\n\n        tasks = _parallelFitTasks(est, train, eva, validation, epm, collectSubModelsParam)\n        for j, metric, subModel in pool.imap_unordered(lambda f: f(), tasks):\n            metrics[j] += (metric / nFolds)\n            if collectSubModelsParam:\n                subModels[i][j] = subModel\n\n        validation.unpersist()\n        train.unpersist()\n
Run Code Online (Sandbox Code Playgroud)\n

我尝试过的:

\n

我已经尝试了最常见的解决方案来解决我收到的广播警告,尽管我已经想象它们在我的情况下不起作用。具体来说,我修改了数据分区和并行化参数,以及执行器和驱动程序的内存大小。

\n

我很确定如果ML库中存在CrossValidator的分布式实现,是因为它确实有用。然而,我一定错过了一些东西,因为我\xe2\x80\x99m无法思考如果我的数据集很大并且需要广播很多次(因为实现)如何让它工作。也许我\xe2\x80\x99m 缺少一些东西?

\n