Vla*_*nko 12 machine-learning amazon-web-services elastic-map-reduce apache-spark
我正在使用Spark 1.4的ML库中的Gradient Boosted Trees学习算法进行实验.我正在解决二进制分类问题,我的输入是大约50,000个样本和~500,000个特征.我的目标是以人类可读的格式输出生成的GBT集合的定义.到目前为止,我的经验是,对于我的问题大小,向群集添加更多资源似乎对运行的长度没有影响.10次迭代训练似乎大约需要13个小时.这是不可接受的,因为我希望进行100-300次迭代运行,并且执行时间似乎随着迭代次数而爆炸.
这不是确切的代码,但可以简化为:
SparkConf sc = new SparkConf().setAppName("GBT Trainer")
// unlimited max result size for intermediate Map-Reduce ops.
// Having no limit is probably bad, but I've not had time to find
// a tighter upper bound and the default value wasn't sufficient.
.set("spark.driver.maxResultSize", "0");
JavaSparkContext jsc = new JavaSparkContext(sc)
// The input file is encoded in plain-text LIBSVM format ~59GB in size
<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), "s3://somebucket/somekey/plaintext_libsvm_file").toJavaRDD();
BoostingStrategy boostingStrategy = BoostingStrategy.defaultParams("Classification");
boostingStrategy.setNumIterations(10);
boostingStrategy.getTreeStrategy().setNumClasses(2);
boostingStrategy.getTreeStrategy().setMaxDepth(1);
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<Integer, Integer>();
boostingStrategy.treeStrategy().setCategoricalFeaturesInfo(categoricalFeaturesInfo);
GradientBoostedTreesModel model = GradientBoostedTrees.train(data, boostingStrategy);
// Somewhat-convoluted code below reads in Parquete-formatted output
// of the GBT model and writes it back out as json.
// There might be cleaner ways of achieving the same, but since output
// size is only a few KB I feel little guilt leaving it as is.
// serialize and output the GBT classifier model the only way that the library allows
String outputPath = "s3://somebucket/somekeyprefex";
model.save(jsc.sc(), outputPath + "/parquet");
// read in the parquet-formatted classifier output as a generic DataFrame object
SQLContext sqlContext = new SQLContext(jsc);
DataFrame outputDataFrame = sqlContext.read().parquet(outputPath + "/parquet"));
// output DataFrame-formatted classifier model as json
outputDataFrame.write().format("json").save(outputPath + "/json");
Run Code Online (Sandbox Code Playgroud)
我的Spark应用程序(或GBT学习算法本身)对该大小的输入有什么性能瓶颈,如何实现更高的执行并行性?
我仍然是新手Spark dev,我很欣赏有关群集配置和执行分析的任何提示.
我在r3.8xlarge实例(32个内核,每个244GB RAM)的AWS EMR集群(emr-4.0.0,YARN集群模式)上运行此应用程序.我正在使用如此大的实例,以最大限度地提高资源分配的灵活性.到目前为止,我已尝试在驱动程序和工作程序之间使用1-3个r3.8xlarge实例和各种资源分配方案.例如,对于1个r3.8xlarge实例的集群,我按如下方式提交应用程序:
aws emr add-steps --cluster-id $1 --steps Name=$2,\
Jar=s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar,\
Args=[/usr/lib/spark/bin/spark-submit,--verbose,\
--deploy-mode,cluster,--master,yarn,\
--driver-memory,60G,\
--executor-memory,30G,\
--executor-cores,5,\
--num-executors,6,\
--class,GbtTrainer,\
"s3://somebucket/somekey/spark.jar"],\
ActionOnFailure=CONTINUE
Run Code Online (Sandbox Code Playgroud)
对于3个r3.8xlarge实例的集群,我调整了资源分配:
--driver-memory,80G,\
--executor-memory,35G,\
--executor-cores,5,\
--num-executors,18,\
Run Code Online (Sandbox Code Playgroud)
我不清楚每个执行者有多少记忆是有用的,但我觉得在任何一种情况下我都很慷慨.通过Spark UI,我没有看到输入大小超过几GB的任务.在为驱动程序进程提供如此多的内存时,我正在谨慎行事,以确保它不会因任何中间结果聚合操作而缺乏内存.
根据Cloudera的"如何调整你的Spark Jobs"系列中的建议,我试图将每个执行器的内核数量减少到5个(根据他们的说法,5个内核往往会引入HDFS IO瓶颈).我还要确保有足够的备用RAM和CPU留给主机操作系统和Hadoop服务.
我唯一的线索是Spark UI在执行的尾部为许多任务显示非常长的调度延迟.我也感觉到Spark UI显示的阶段/任务时间线并不能解释作业完成所有时间.我怀疑驱动程序应用程序在每次训练迭代结束时或整个训练运行结束时都会执行某种冗长的操作.
我已经对调整Spark应用程序做了大量研究.大多数文章都会提供有关使用RDD操作的很好建议,这些操作可以减少中间输入大小或避免阶段之间的数据混乱.在我的情况下,我基本上使用的是"开箱即用"算法,该算法由ML专家编写,应该已经在这方面进行了很好的调整.我自己的代码将GBT模型输出到S3应该花费一些微不足道的时间来运行.
我没有使用过 MLLibs GBT 实现,但我都使用过
LightGBM和XGBoost成功。我强烈建议您看看其他这些库。
一般来说,GBM 实现需要迭代地训练模型,因为它们在构建下一棵树时考虑到整个集成的损失。这使得 GBM 训练本质上存在瓶颈,并且不容易并行化(与可以轻松并行化的随机森林不同)。我希望它能在任务较少的情况下表现更好,但这可能不是您的全部问题。由于您有如此之多的特征(500K),因此在训练期间计算直方图和分割点时,您将有非常高的开销。您应该减少拥有的特征数量,特别是因为它们比样本数量大得多,这会导致过度拟合。
至于调整集群:您希望最大限度地减少数据移动,因此执行器更少,内存更多。每个 ec2 实例 1 个执行程序,核心数量设置为实例提供的任何数量。
您的数据足够小,可以容纳大约 2 个该大小的 EC2。假设您使用双精度(8 字节),则大小为 8 * 500000 * 50000 = 200 GB 尝试通过.cache()在数据帧上使用将其全部加载到 RAM 中。如果您对所有行执行操作(例如求和),您应该强制加载它,并且可以测量 IO 花费的时间。一旦它进入内存并缓存任何其他操作,它就会更快。
| 归档时间: |
|
| 查看次数: |
1084 次 |
| 最近记录: |