我正在尝试最大化群集使用以完成一项简单的任务.
群集是1 + 2 x m3.xlarge,运行Spark 1.3.1,Hadoop 2.4,Amazon AMI 3.7
该任务读取文本文件的所有行并将其解析为csv.
当我将任务作为纱线群集模式提交时,我得到以下结果之一:
我期待的是:
有时,当我使用1个执行程序执行"成功"执行时,克隆并重新启动该步骤最终会有0个执行程序.
我使用此命令创建了我的集群:
aws emr --region us-east-1 create-cluster --name "Spark Test"
--ec2-attributes KeyName=mykey
--ami-version 3.7.0
--use-default-roles
--instance-type m3.xlarge
--instance-count 3
--log-uri s3://mybucket/logs/
--bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark,Args=["-x"]
--steps Name=Sample,Jar=s3://elasticmapreduce/libs/script-runner/script-runner.jar,Args=[/home/hadoop/spark/bin/spark-submit,--master,yarn,--deploy-mode,cluster,--class,my.sample.spark.Sample,s3://mybucket/test/sample_2.10-1.0.0-SNAPSHOT-shaded.jar,s3://mybucket/data/],ActionOnFailure=CONTINUE
Run Code Online (Sandbox Code Playgroud)
有一些步骤变化,包括:
--driver-memory 8G --driver-cores 4 --num-executors 2
使用-x的install-spark脚本生成以下spark-defaults.conf:
$ cat spark-defaults.conf
spark.eventLog.enabled false
spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
spark.driver.extraJavaOptions -Dspark.driver.log.level=INFO
spark.executor.instances 2
spark.executor.cores 4
spark.executor.memory 9404M
spark.default.parallelism 8
Run Code Online (Sandbox Code Playgroud)
更新1
我使用通用JavaWordCount示例获得相同的行为:
/home/hadoop/spark/bin/spark-submit --verbose --master yarn --deploy-mode …
Run Code Online (Sandbox Code Playgroud) 我在Amazon EMR集群上提交作业.我想将所有火花记录发送到redis/logstash.在EMR下配置spark的正确方法是什么?
保持log4j:添加一个bootstrap动作来修改/home/hadoop/spark/conf/log4j.properties来添加一个appender?但是,这个文件已经包含了很多东西,并且是hadoop conf文件的符号链接.我不想过多地使用它,因为它已经包含了一些rootLogger.哪个appender会做得最好?ryantenney/log4j-redis-appender + logstash/log4j-jsonevent-layout或pavlobaron/log4j2redis?
迁移到slf4j + logback:从spark-core中排除slf4j-log4j12,添加log4j-over-slf4j ...并使用带有com.cwbase.logback.RedisAppender的logback.xml?看起来这将依赖于问题.它会隐藏已在log4j.properties中定义的log4j.rootLoggers吗?
我错过了什么?
你对此有何看法?
更新
看起来我无法获得第二种选择.运行测试很好但是使用spark-submit(使用--conf spark.driver.userClassPathFirst = true)总是会在类路径上找到可怕的"检测到log4j-over-slf4j.jar和slf4j-log4j12.jar",抢占StackOverflowError."
我正在根据自定义排序对整数索引进行一些排序.我发现这里使用的Ordering [T]使用直接调用compare方法比使用手工制作的quickSort慢至少10倍.这看起来非常昂贵!
val indices: Array[Int] = ...
class OrderingByScore extends Ordering[Int] { ... }
time { (0 to 10000).par.foreach(x => {
scala.util.Sorting.quickSort[Int](indices.take(nb))(new OrderingByScore)
})}
// Elapsed: 30 seconds
Run Code Online (Sandbox Code Playgroud)
与手工制作的sortArray相比,此处可以修改以添加ord: Ordering[Int]
参数:
def sortArray1(array: Array[Int], left: Int, right: Int, ord: Ordering[Int]) = ...
time { (0 to 10000).par.foreach(x => {
sortArray1(indices.take(nb), 0, nb - 1, new OrderingByScore)
})}
// Elapsed: 19 seconds
Run Code Online (Sandbox Code Playgroud)
最后,相同的代码片段,但使用精确类型而不是(ord: OrderingByScore
):
def sortArray2(array: Array[Int], left: Int, right: Int, ord: OrderingByScore) = ...
time …
Run Code Online (Sandbox Code Playgroud) apache-spark ×2
emr ×1
hadoop-yarn ×1
log4j ×1
logback ×1
performance ×1
scala ×1
slf4j ×1
sorting ×1