从 HDFS 检索数据时如何获取文件元数据?

Mak*_*nii 2 hadoop hdfs apache-spark

我从 HDFS 请求数据,我想获取从中读取它们的文件的元数据。这将使我能够根据给定时刻的可用数据构建看起来像的报告。

我找到了org.apache.hadoop.fs.FileSystem用于获取所有文件列表的解决方案。我知道分区规则,我可以row -> meta根据收到的列表构建映射。

但这一决定似乎难以实施和支持。也许有更简单的方法来实现相同的结果?

Sri*_*vas 5

我创建了小的辅助方法metadata,您可以直接调用 DataFrame 对象,例如df.metadata,它将在可用元数据上创建 DataFrame 并返回 DataFrame。

最终 DataFrame 中的元列

  • 小路
  • 是目录
  • length -- 将显示为人类可读的格式 47 字节
  • 复制
  • blockSize -- 将以人类可读的格式显示 47 字节
  • 修改时间——这将从 unix 时间转换为正常的日期时间。
  • 访问时间
  • 所有者
  • 团体
  • 允许
  • 是符号链接

scala> :paste
// Entering paste mode (ctrl-D to finish)

  import org.joda.time.DateTime
  import org.apache.commons.io.FileUtils
  import org.apache.spark.sql.DataFrame
  import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}

  // Storing Metadata
  case class FileMetaData(path: String,
                          isDirectory:Boolean,
                          length:String,
                          replication:Int,
                          blockSize:String,
                          modificationTime: String,
                          accessTime:String ,
                          owner:String ,
                          group:String ,
                          permission:String,
                          isSymlink:Boolean)

  object FileMetaData {

    def apply(lfs: LocatedFileStatus):FileMetaData = {        
      new FileMetaData(
        path= lfs.getPath.toString,
        isDirectory=lfs.isDirectory,
        length=FileUtils.byteCountToDisplaySize(lfs.getLen),
        replication=lfs.getReplication,
        blockSize=FileUtils.byteCountToDisplaySize(lfs.getBlockSize),
        modificationTime=new DateTime(lfs.getModificationTime).toString,
        accessTime=new DateTime(lfs.getAccessTime).toString ,
        owner=lfs.getOwner ,
        group=lfs.getGroup ,
        permission=lfs.getPermission.toString,
        isSymlink=lfs.isSymlink
      )
    }
  }

  // Convert RemoteIterator to Scala Iterator.
  implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
    case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
      override def hasNext: Boolean = remoteIterator.hasNext
      override def next(): T = remoteIterator.next()
    }
    wrapper(remoteIterator)
  }

  // Using this we can call metadata method on df - like df.metadata.
  implicit class MetaData(df: DataFrame) {
    def metadata =  {
      import df.sparkSession.implicits._
       df.inputFiles.map(new Path(_))
         .flatMap{
           FileSystem
             .get(df.sparkSession.sparkContext.hadoopConfiguration)
             .listLocatedStatus(_)
             .toList
       }
         .map(FileMetaData(_)).toList.toDF
     }
  }

// Exiting paste mode, now interpreting.

warning: there was one feature warning; re-run with -feature for details
import org.joda.time.DateTime
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.DataFrame
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
defined class FileMetaData
defined object FileMetaData
convertToScalaIterator: [T](remoteIterator: org.apache.hadoop.fs.RemoteIterator[T])Iterator[T]
defined class MetaData

scala> val df = spark.read.format("json").load("/tmp/data")
df: org.apache.spark.sql.DataFrame = [json_data: struct<value: string>]



scala> df.show(false)
+------------------+
|json_data         |
+------------------+
|[{"a":1} ,{"b":2}]|
|[{"a":1} ,{"b":2}]|
|[{"a":1} ,{"b":2}]|
+------------------+

scala>

Run Code Online (Sandbox Code Playgroud)

DataFrame 元数据输出

scala> df.metadata.show(false)

+-------------------------+-----------+--------+-----------+---------+-----------------------------+-----------------------------+--------+-----+----------+---------+
|path                     |isDirectory|length  |replication|blockSize|modificationTime             |accessTime                   |owner   |group|permission|isSymlink|
+-------------------------+-----------+--------+-----------+---------+-----------------------------+-----------------------------+--------+-----+----------+---------+
|file:/tmp/data/fileB.json|false      |47 bytes|1          |32 MB    |2020-04-25T13:47:00.000+05:30|1970-01-01T05:30:00.000+05:30|srinivas|wheel|rw-r--r-- |false    |
|file:/tmp/data/fileC.json|false      |47 bytes|1          |32 MB    |2020-04-25T13:47:10.000+05:30|1970-01-01T05:30:00.000+05:30|srinivas|wheel|rw-r--r-- |false    |
|file:/tmp/data/fileA.json|false      |47 bytes|1          |32 MB    |2020-04-25T11:35:12.000+05:30|1970-01-01T05:30:00.000+05:30|srinivas|wheel|rw-r--r-- |false    |
+-------------------------+-----------+--------+-----------+---------+-----------------------------+-----------------------------+--------+-----+----------+---------+
Run Code Online (Sandbox Code Playgroud)