Aug*_*sto 8 scala apache-spark
首先让我指出我对Spark和Scala都很陌生.我试图通过尝试迁移我过去做过的Hadoop Map/Reduce Jobs之一来试图调查承诺的Spark性能.这项工作需要花费14分钟在Hadoop上使用3x r3.2xlarge机器输入16个压缩的bzip文件,每个文件170mb.我把它翻译成Scala/Spark,我可以把它变成这样的东西:
val conceptData = spark.textFile(inputPath)
val result = conceptData.repartition(60).cache()
.map(line => {val metrics = JsonUtil.fromJson[ArticleMetrics](line); (metrics.source, metrics.data.get("entities").get)})
.flatMap(metrics => metrics._2.map(t => (t._1,(1,List((metrics._1,t._2.head))))))
.reduceByKey((a,b) => combine(a,b))
.map(t => t._1 + "\t" + t._2._1 + "\t" + print(t._2._2))
result.saveAsTextFile(outputPath)
def print(tuples: List[(String, Any)]): String =
{
tuples.map(l => l._1 + "\u200e" + l._2).reduce(_ + "\u200f" + _)
}
def combine(a: (Int, List[(String, Any)]), b: (Int, List[(String, Any)])): (Int, List[(String, Any)]) =
{
(a._1 + b._1,a._2 ++ b._2)
}
object JsonUtil {
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
def fromJson[T](json: String)(implicit m : Manifest[T]): T = {
mapper.readValue[T](json)
}
}
Run Code Online (Sandbox Code Playgroud)
我在开头使用repartition命令将分区设置为60,因为我读到了每个核心有2-3个分区的好处.我在相同的3x r3.2xlarge机器上运行这个Spark作业(每个机器有8个核心和58G可用)所以我以下列方式提交我的工作:
spark/bin/spark-submit --executor-memory 58G --total-executor-cores 24 (... other arguments ...)
Run Code Online (Sandbox Code Playgroud)
运行相同的输入需要1个多小时...我不确定问题是在Scala还是Spark配置中,所以欢迎任何帮助.
最好的问候,奥古斯托
编辑1:某些操作的平均时间:
从S3读取文件:~2分钟
flatMap:~11分钟
reduceByKey:> 1小时
使用的密钥是S3路径,所以它们可能会很长,不知道是否有所作为.
编辑2:我替换了reduceByKey
函数,.reduceByKey((a,b) => a)
并且作业在10分钟结束,因此combine
函数必定存在一些问题
这归结于我菜鸟的 Scala 编程技巧 - 当更改为以下性能更高的 Scala 时,只需要 15 分钟:
val conceptData = spark.textFile(inputPath).repartition(24)
val result = conceptData.map(line => {val metrics = JsonUtil.fromJson[ArticleMetrics](line); (metrics.source, metrics.data.get("entities").get)})
.flatMap(metrics => metrics._2.map(t => (t._1,(1, List(metrics._1+"\u200e"+ t._2.head)))))
.reduceByKey((a,b) => (a._1 + b._1, a._2:::b._2))
.map(t=> t._1 + "\t" + t._2._1 + "\t" + t._2._2.mkString("\u200f"))
Run Code Online (Sandbox Code Playgroud)
它可能还可以进一步改进。不管怎样,谢谢大家的帮助。
此致,
奥古斯托
归档时间: |
|
查看次数: |
1220 次 |
最近记录: |