ni_*_*ama 5 scala hdfs apache-spark spark-avro
我知道在 HDFS 中读取大量小文件的问题一直是一个问题并被广泛讨论,但请耐心等待。处理此类问题的大多数 stackoverflow 问题都与读取大量 txt 文件有关。我正在尝试读取大量小 avro 文件
另外,这些读取txt文件解决方案讨论使用WholeTextFileInputFormat或CombineInputFormat(/sf/answers/3072911341/)它们是RDD实现,我使用Spark 2.4(HDFS 3.0.0)并且通常不鼓励使用RDD实现和数据框是首选。我更喜欢使用数据帧,但也对 RDD 实现持开放态度。
我已经按照 Murtaza 的建议尝试合并数据帧,但在大量文件上出现 OOM 错误(/sf/answers/2248236301/)
我正在使用以下代码
val filePaths = avroConsolidator.getFilesInDateRangeWithExtension //pattern:filePaths: Array[String]
//I do need to create a list of file paths as I need to filter files based on file names. Need this logic for some upstream process
//example : Array("hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1530.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1531.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1532.avro")
val df_mid = sc.read.format("com.databricks.spark.avro").load(filePaths: _*)
val df = df_mid
.withColumn("dt", date_format(df_mid.col("timeStamp"), "yyyy-MM-dd"))
.filter("dt != 'null'")
df
.repartition(partitionColumns(inputs.logSubType).map(new org.apache.spark.sql.Column(_)):_*)
.write.partitionBy(partitionColumns(inputs.logSubType): _*)
.mode(SaveMode.Append)
.option("compression","snappy")
.parquet(avroConsolidator.parquetFilePath.toString)
Run Code Online (Sandbox Code Playgroud)
avro 文件存储在 yyyy/mm/dd 分区中: hdfs://server123:8020/source/Avro/weblog/2019/06/03
有什么方法可以加快叶文件的列出速度,正如您从屏幕截图中看到的那样,只需 6 秒即可合并为镶木地板文件,但列出文件需要 1.3 分钟
由于读取大量小文件花费的时间太长,我退后一步,使用以下命令创建 RDDCombineFileInputFormat创建RDD创建 RDD 。此InputFormat适用于小文件,因为它将许多小文件打包到一个分割中,因此映射器更少,并且每个映射器有更多的数据要处理。
\n\n这就是我所做的:
\n\ndef createDataFrame(filePaths: Array[Path], sc: SparkSession, inputs: AvroConsolidatorInputs): DataFrame = {\n\n val job: Job = Job.getInstance(sc.sparkContext.hadoopConfiguration)\n FileInputFormat.setInputPaths(job, filePaths: _*)\n val sqlType = SchemaConverters.toSqlType(getSchema(inputs.logSubType))\n\n val rddKV = sc.sparkContext.newAPIHadoopRDD(\n job.getConfiguration,\n classOf[CombinedAvroKeyInputFormat[GenericRecord]],\n classOf[AvroKey[GenericRecord]],\n classOf[NullWritable])\n\n val rowRDD = rddKV.mapPartitions(\n f = (iter: Iterator[(AvroKey[GenericRecord], NullWritable)]) =>\n iter.map(_._1.datum()).map(genericRecordToRow(_, sqlType))\n , preservesPartitioning = true)\n\n val df = sc.sqlContext.createDataFrame(rowRDD , \n sqlType.dataType.asInstanceOf[StructType])\n df\nRun Code Online (Sandbox Code Playgroud)\n\n组合AvroKeyInputFormat是用户定义的类,它扩展了CombineFileInputFormat并将64MB的数据放入单个分割中。
\n\nobject CombinedAvroKeyInputFormat {\n\n class CombinedAvroKeyRecordReader[T](var inputSplit: CombineFileSplit, context: TaskAttemptContext, idx: Integer)\n extends AvroKeyRecordReader[T](AvroJob.getInputKeySchema(context.getConfiguration))\n {\n @throws[IOException]\n @throws[InterruptedException]\n override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {\n this.inputSplit = inputSplit.asInstanceOf[CombineFileSplit]\n val fileSplit = new FileSplit(this.inputSplit.getPath(idx),\n this.inputSplit.getOffset(idx),\n this.inputSplit.getLength(idx),\n this.inputSplit.getLocations)\n super.initialize(fileSplit, context)\n }\n }\n\n}\n\n/*\n * The class CombineFileInputFormat is an abstract class with no implementation, so we must create a subclass to support it;\n * We\xe2\x80\x99ll name the subclass CombinedAvroKeyInputFormat. The subclass will initiate a delegate CombinedAvroKeyRecordReader that extends AvroKeyRecordReader\n */\n\nclass CombinedAvroKeyInputFormat[T] extends CombineFileInputFormat[AvroKey[T], NullWritable] {\n val logger = Logger.getLogger(AvroConsolidator.getClass)\n setMaxSplitSize(67108864)\n def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[AvroKey[T], NullWritable] = {\n val c = classOf[CombinedAvroKeyInputFormat.CombinedAvroKeyRecordReader[_]]\n val inputSplit = split.asInstanceOf[CombineFileSplit]\n\n /*\n * CombineFileRecordReader is a built in class that pass each split to our class CombinedAvroKeyRecordReader\n * When the hadoop job starts, CombineFileRecordReader reads all the file sizes in HDFS that we want it to process,\n * and decides how many splits base on the MaxSplitSize\n */\n return new CombineFileRecordReader[AvroKey[T], NullWritable](\n inputSplit,\n context,\n c.asInstanceOf[Class[_ <: RecordReader[AvroKey[T], NullWritable]]])\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n\n这使得小文件的读取速度更快
\n