YARN上的Apache Spark:大量输入数据文件(在spark中组合多个输入文件)

zeo*_*dtr 15 hadoop hadoop-yarn apache-spark

需要帮助实施最佳实践.操作环境如下:

  • 日志数据文件不定期到达.
  • 日志数据文件的大小为3.9KB到8.5MB.平均约1MB.
  • 数据文件的记录数从13行到22000行.平均约2700行.
  • 必须在聚合之前对数据文件进行后处理.
  • 后处理算法可以改变.
  • 后处理文件与原始数据文件分开管理,因为后处理算法可能会更改.
  • 执行每日聚合.必须逐个记录所有后处理数据文件,并计算聚合(平均值,最大最小值...).
  • 由于聚合是细粒度的,因此聚合后的记录数量不会太少.它可以是原始记录数量的大约一半.
  • 在某一点上,后处理文件的数量可以是大约200,000.
  • 应该能够单独删除数据文件.

在测试中,我尝试使用带有glob路径的sc.textFile()从Spark处理160,000个后处理文件,但在驱动程序进程中出现OutOfMemory异常失败.

处理此类数据的最佳做法是什么?我应该使用HBase而不是普通文件来保存后处理数据吗?

Rom*_*kov 9

我们写了自己的装载机 它通过HDFS中的小文件解决了我们的问题.它使用Hadoop CombineFileInputFormat.在我们的例子中,它将映射器的数量从100000减少到大约3000,并使工作速度明显加快.

https://github.com/RetailRocket/SparkMultiTool

例:

import ru.retailrocket.spark.multitool.Loaders 
val sessions = Loaders.combineTextFile(sc, "file:///test/*") 
// or val sessions = Loaders.combineTextFile(sc, conf.weblogs(), size = 256, delim = "\n") 
// where size is split size in Megabytes, delim - line break character 
println(sessions.count())
Run Code Online (Sandbox Code Playgroud)

  • 由于现在hadoop支持CombineTextInputFormat(至少从2.2开始),因此可以使用sc.newAPIHadoopFile()完成小的输入文件的合并,而无需实现自定义类。 (2认同)