使用sc.textFile以递归方式从子目录中获取文件内容

jav*_*dba 19 java apache-spark

似乎SparkContext textFile只希望文件存在于给定的目录位置 - 它也不存在

  • (a)递归或
  • (b)甚至支持目录(尝试将目录读取为文件)

任何建议如何构造递归 - 可能比手动创建递归文件列表/下降逻辑更简单?

这是用例:文件下

/数据/表/ MY_TABLE

我希望能够通过hdfs调用读取该父目录下所有目录级别的所有文件.

UPDATE

sc.textFile()通过(子类)TextInputFormat调用Hadoop FileInputFormat.在逻辑内部存在执行递归目录读取 - 即首先检测条目是否是目录,如果是,则降序:

<!-- language: java -->
     for (FileStatus globStat: matches) {
218          if (globStat.isDir()) {
219            for(FileStatus stat: fs.listStatus(globStat.getPath(),
220                inputFilter)) {
221              result.add(stat);
222            }          
223          } else {
224            result.add(globStat);
225          }
226        }
Run Code Online (Sandbox Code Playgroud)

但是,在调用sc.textFile时,目录条目上存在错误:"not a file".这种行为令人困惑 - 因为似乎有适当的支持来处理目录.

jav*_*dba 39

我在看旧版的FileInputFormat ..

设置递归配置之前mapreduce.input.fileinputformat.input.dir.recursive

scala> sc.textFile("dev/*").count
     java.io.IOException: Not a file: file:/shared/sparkup/dev/audit-release/blank_maven_build
Run Code Online (Sandbox Code Playgroud)

默认值为null/not set,其值为"false":

scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive")
res1: String = null
Run Code Online (Sandbox Code Playgroud)

后:

现在设置值:

sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")
Run Code Online (Sandbox Code Playgroud)

现在重试递归操作:

scala>sc.textFile("dev/*/*").count

..
res5: Long = 3481

So it works.
Run Code Online (Sandbox Code Playgroud)

@Ben为每条评论 添加/ 更新完整递归

  • 这非常有用.还值得补充的是,每个级别都需要一个星号.我的数据存储更像`dev/<year>/<month>/<day>`.要获得多个月的数据,我需要语法,如`sc.textFile("dev/2015/*/*").count` (4认同)