Spark on Cluster:读取大量小 avro 文件需要很长时间才能列出

ni_*_*ama 5 scala hdfs apache-spark spark-avro

我知道在 HDFS 中读取大量小文件的问题一直是一个问题并被广泛讨论,但请耐心等待。处理此类问题的大多数 stackoverflow 问题都与读取大量 txt 文件有关。我正在尝试读取大量小 avro 文件

另外,这些读取txt文件解决方案讨论使用WholeTextFileInputFormat或CombineInputFormat(/sf/answers/3072911341/)它们是RDD实现,我使用Spark 2.4(HDFS 3.0.0)并且通常不鼓励使用RDD实现和数据框是首选。我更喜欢使用数据帧,但也对 RDD 实现持开放态度。

我已经按照 Murtaza 的建议尝试合并数据帧,但在大量文件上出现 OOM 错误(/sf/answers/2248236301/

我正在使用以下代码

val filePaths = avroConsolidator.getFilesInDateRangeWithExtension //pattern:filePaths: Array[String] 
//I do need to create a list of file paths as I need to filter files based on file names. Need this logic for some upstream process
//example : Array("hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1530.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1531.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1532.avro")
val df_mid = sc.read.format("com.databricks.spark.avro").load(filePaths: _*)
      val df = df_mid
        .withColumn("dt", date_format(df_mid.col("timeStamp"), "yyyy-MM-dd"))
        .filter("dt != 'null'")

      df
        .repartition(partitionColumns(inputs.logSubType).map(new org.apache.spark.sql.Column(_)):_*)
        .write.partitionBy(partitionColumns(inputs.logSubType): _*)
        .mode(SaveMode.Append)
        .option("compression","snappy")
        .parquet(avroConsolidator.parquetFilePath.toString)
Run Code Online (Sandbox Code Playgroud)

作业级别列出183个小文件耗时1.6分钟 职位界面

奇怪的是我的舞台 UI 页面只显示 3 秒(不明白为什么) 舞台界面

avro 文件存储在 yyyy/mm/dd 分区中: hdfs://server123:8020/source/Avro/weblog/2019/06/03

有什么方法可以加快叶文件的列出速度,正如您从屏幕截图中看到的那样,只需 6 秒即可合并为镶木地板文件,但列出文件需要 1.3 分钟

ni_*_*ama 2

由于读取大量小文件花费的时间太长,我退后一步,使用以下命令创建 RDDCombineFileInputFormat创建RDD创建 RDD 。此InputFormat适用于小文件,因为它将许多小文件打包到一个分割中,因此映射器更少,并且每个映射器有更多的数据要处理。

\n\n

这就是我所做的:

\n\n
def createDataFrame(filePaths: Array[Path], sc: SparkSession, inputs: AvroConsolidatorInputs): DataFrame = {\n\n   val job: Job = Job.getInstance(sc.sparkContext.hadoopConfiguration)\n   FileInputFormat.setInputPaths(job, filePaths: _*)\n   val sqlType = SchemaConverters.toSqlType(getSchema(inputs.logSubType))\n\n   val rddKV = sc.sparkContext.newAPIHadoopRDD(\n                   job.getConfiguration,\n                   classOf[CombinedAvroKeyInputFormat[GenericRecord]],\n                   classOf[AvroKey[GenericRecord]],\n                   classOf[NullWritable])\n\n   val rowRDD = rddKV.mapPartitions(\n                  f = (iter: Iterator[(AvroKey[GenericRecord], NullWritable)]) =>\n                       iter.map(_._1.datum()).map(genericRecordToRow(_, sqlType))\n                       , preservesPartitioning = true)\n\n   val df = sc.sqlContext.createDataFrame(rowRDD , \n              sqlType.dataType.asInstanceOf[StructType])\n   df\n
Run Code Online (Sandbox Code Playgroud)\n\n

组合AvroKeyInputFormat是用户定义的类,它扩展了CombineFileInputFormat并将64MB的数据放入单个分割中。

\n\n
object CombinedAvroKeyInputFormat {\n\n  class CombinedAvroKeyRecordReader[T](var inputSplit: CombineFileSplit, context: TaskAttemptContext, idx: Integer)\n    extends AvroKeyRecordReader[T](AvroJob.getInputKeySchema(context.getConfiguration))\n  {\n    @throws[IOException]\n    @throws[InterruptedException]\n    override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {\n      this.inputSplit = inputSplit.asInstanceOf[CombineFileSplit]\n      val fileSplit = new FileSplit(this.inputSplit.getPath(idx),\n                                    this.inputSplit.getOffset(idx),\n                                    this.inputSplit.getLength(idx),\n                                    this.inputSplit.getLocations)\n      super.initialize(fileSplit, context)\n    }\n  }\n\n}\n\n/*\n * The class CombineFileInputFormat is an abstract class with no implementation, so we must create a subclass to support it;\n * We\xe2\x80\x99ll name the subclass CombinedAvroKeyInputFormat. The subclass will initiate a delegate CombinedAvroKeyRecordReader that extends AvroKeyRecordReader\n */\n\nclass CombinedAvroKeyInputFormat[T] extends CombineFileInputFormat[AvroKey[T], NullWritable] {\n  val logger = Logger.getLogger(AvroConsolidator.getClass)\n  setMaxSplitSize(67108864)\n  def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[AvroKey[T], NullWritable] = {\n    val c = classOf[CombinedAvroKeyInputFormat.CombinedAvroKeyRecordReader[_]]\n    val inputSplit = split.asInstanceOf[CombineFileSplit]\n\n    /*\n     * CombineFileRecordReader is a built in class that pass each split to our class CombinedAvroKeyRecordReader\n     * When the hadoop job starts, CombineFileRecordReader reads all the file sizes in HDFS that we want it to process,\n     * and decides how many splits base on the MaxSplitSize\n     */\n    return new CombineFileRecordReader[AvroKey[T], NullWritable](\n      inputSplit,\n      context,\n      c.asInstanceOf[Class[_ <: RecordReader[AvroKey[T], NullWritable]]])\n  }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

这使得小文件的读取速度更快

\n