递归列出文件并按扩展名过滤

And*_*rew 2 scala apache-spark

我试图递归地遍历给定的目录及其所有子目录,并列出以 .json 结尾的所有文件。感谢这个答案,我已经得到了递归位的工作。现在我正在尝试找出过滤部分。

这是我当前的尝试:

    import org.apache.hadoop.fs.{FileSystem,Path, PathFilter}
    import org.apache.hadoop.fs.Path;

    val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)

    def listJsonFiles(hdfsPath: String): List[String] = {
      fs
        .listStatus(new Path(hdfsPath)).filter(_.getPath.getName.endsWith(".json"))
        //.listStatus(new Path(hdfsPath))
        .flatMap { obj =>
          if (obj.isFile)
            List(hdfsPath + "/" + obj.getPath.getName)
          // If it's a dir and we're in a recursive option:
          else
            listJsonFiles(hdfsPath + "/" + obj.getPath.getName)
        }
        .toList
    }
    val files = listJsonFiles("/path/to/some/stuff")
Run Code Online (Sandbox Code Playgroud)

这会返回一个空列表。如果我使用不带过滤器的 listStatus 行,它会返回我传入的路径中所有文件的列表,包括所有 *.json 文件。所以我知道它们存在,但我无法让过滤器工作。

小智 5

问题是首先应用过滤器来过滤掉子目录。将过滤器向下移动到 flatMap。

\n\n
import org.apache.hadoop.fs.{FileSystem, Path}\n\nval fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)\n\ndef listJsonFiles(hdfsPath: String): List[String] = {\n  fs\n    .listStatus(new Path(hdfsPath))\n    //.listStatus(new Path(hdfsPath))\n    .flatMap { obj =>\n      if (obj.isDirectory) listJsonFiles(obj.getPath.toString)\n      else if(obj.getPath.getName.endsWith(".json")) List(obj.getPath.toString)\n      else Nil\n    }\n    .toList\n}\nval files = listJsonFiles("/var/tmp/stuff")\nfiles.foreach(println)\n
Run Code Online (Sandbox Code Playgroud)\n\n

文件系统上有本地方法来递归扫描 HDFS 目录。

\n\n

FileSystem.listFiles接受一个参数来进行递归搜索并返回,RemoteIteator我们可以使用它来过滤所需的文件。这也可以保护 JVM 运行免受 OOO​​ 的影响。

\n\n
import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}\n\nval fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)\n\n// implicit for converting remote Iterator to scala iterator\nimplicit def  remoteIteratorToIterator[A](ri: RemoteIterator[A]): Iterator[A] = new Iterator[A] {\n  override def hasNext: Boolean = ri.hasNext\n\n  override def next(): A = ri.next()\n}\n\ndef listJsonFiles(hdfsPath: String): List[String] = {\n  fs\n    .listFiles(new Path(hdfsPath), true)\n    .map(_.getPath.toString)\n    .filter(_.endsWith(".json"))\n    .toList\n}\nval files = listJsonFiles("/var/tmp/stuff")\nfiles.foreach(println)\n
Run Code Online (Sandbox Code Playgroud)\n\n

输入目录

\n\n
$ tree stuff/\nstuff/\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 dir1\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 dir1.json\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 dir2\n\xe2\x94\x82\xc2\xa0\xc2\xa0     \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 dir2.json\n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 stuff.json\n\n2 directories, 3 files\n
Run Code Online (Sandbox Code Playgroud)\n\n

输出

\n\n
file:/var/tmp/stuff/stuff.json\nfile:/var/tmp/stuff/dir1/dir2/dir2.json\nfile:/var/tmp/stuff/dir1/dir1.json\n
Run Code Online (Sandbox Code Playgroud)\n