And*_*rew 2 scala apache-spark
我试图递归地遍历给定的目录及其所有子目录,并列出以 .json 结尾的所有文件。感谢这个答案,我已经得到了递归位的工作。现在我正在尝试找出过滤部分。
这是我当前的尝试:
import org.apache.hadoop.fs.{FileSystem,Path, PathFilter}
import org.apache.hadoop.fs.Path;
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
def listJsonFiles(hdfsPath: String): List[String] = {
fs
.listStatus(new Path(hdfsPath)).filter(_.getPath.getName.endsWith(".json"))
//.listStatus(new Path(hdfsPath))
.flatMap { obj =>
if (obj.isFile)
List(hdfsPath + "/" + obj.getPath.getName)
// If it's a dir and we're in a recursive option:
else
listJsonFiles(hdfsPath + "/" + obj.getPath.getName)
}
.toList
}
val files = listJsonFiles("/path/to/some/stuff")
Run Code Online (Sandbox Code Playgroud)
这会返回一个空列表。如果我使用不带过滤器的 listStatus 行,它会返回我传入的路径中所有文件的列表,包括所有 *.json 文件。所以我知道它们存在,但我无法让过滤器工作。
小智 5
问题是首先应用过滤器来过滤掉子目录。将过滤器向下移动到 flatMap。
\n\nimport org.apache.hadoop.fs.{FileSystem, Path}\n\nval fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)\n\ndef listJsonFiles(hdfsPath: String): List[String] = {\n fs\n .listStatus(new Path(hdfsPath))\n //.listStatus(new Path(hdfsPath))\n .flatMap { obj =>\n if (obj.isDirectory) listJsonFiles(obj.getPath.toString)\n else if(obj.getPath.getName.endsWith(".json")) List(obj.getPath.toString)\n else Nil\n }\n .toList\n}\nval files = listJsonFiles("/var/tmp/stuff")\nfiles.foreach(println)\n
Run Code Online (Sandbox Code Playgroud)\n\n文件系统上有本地方法来递归扫描 HDFS 目录。
\n\nFileSystem.listFiles
接受一个参数来进行递归搜索并返回,RemoteIteator
我们可以使用它来过滤所需的文件。这也可以保护 JVM 运行免受 OOO 的影响。
import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}\n\nval fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)\n\n// implicit for converting remote Iterator to scala iterator\nimplicit def remoteIteratorToIterator[A](ri: RemoteIterator[A]): Iterator[A] = new Iterator[A] {\n override def hasNext: Boolean = ri.hasNext\n\n override def next(): A = ri.next()\n}\n\ndef listJsonFiles(hdfsPath: String): List[String] = {\n fs\n .listFiles(new Path(hdfsPath), true)\n .map(_.getPath.toString)\n .filter(_.endsWith(".json"))\n .toList\n}\nval files = listJsonFiles("/var/tmp/stuff")\nfiles.foreach(println)\n
Run Code Online (Sandbox Code Playgroud)\n\n输入目录
\n\n$ tree stuff/\nstuff/\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 dir1\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 dir1.json\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 dir2\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 dir2.json\n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 stuff.json\n\n2 directories, 3 files\n
Run Code Online (Sandbox Code Playgroud)\n\n输出
\n\nfile:/var/tmp/stuff/stuff.json\nfile:/var/tmp/stuff/dir1/dir2/dir2.json\nfile:/var/tmp/stuff/dir1/dir1.json\n
Run Code Online (Sandbox Code Playgroud)\n
归档时间: |
|
查看次数: |
2840 次 |
最近记录: |