如何让 Spark 会话递归读取所有文件?

Sau*_*ahu 6 regex recursion scala apache-spark

显示存储 JSON 文件的目录

$ tree -d try/
try/
??? 10thOct_logs1
??? 11thOct
?   ??? logs2
??? Oct
    ??? 12th
        ??? logs3
Run Code Online (Sandbox Code Playgroud)

任务是使用SparkSession.

有没有一种优雅的方式来读取目录中的所有文件,然后递归地读取子目录

我尝试过的命令很少会导致意外排除。

spark.read.json("file:///var/foo/try/<exp>")

+----------+---+-----+-------+
| <exp> -> | * | */* | */*/* |
+----------+---+-----+-------+
| logs1    | y | y   | n     |
| logs2    | n | y   | y     |
| logs3    | n | n   | y     |
+----------+---+-----+-------+
Run Code Online (Sandbox Code Playgroud)

您可以在上表中看到,三个表达式中没有一个同时匹配所有目录(位于 3 个不同深度)。坦率地说,我没想到10thOct_logs1在使用第三个表达式时会排除*/*/*

这让我得出结论,任何与最后一个表达式匹配的文件或目录路径/都被视为完全匹配,其他所有内容都将被忽略。

bla*_*hop 9

更新

Spark 3 中引入了一个新选项来读取嵌套文件夹recursiveFileLookup

spark.read.option("recursiveFileLookup", "true").json("file:///var/foo/try")
Run Code Online (Sandbox Code Playgroud)

对于旧版本,或者,您可以使用 HadooplistFiles递归列出所有文件路径,然后将它们传递给 Spark 读取:

import org.apache.hadoop.fs.{Path}

val conf = sc.hadoopConfiguration

// get all file paths
val fromFolder = new Path("file:///var/foo/try/")
val logfiles = fromFolder.getFileSystem(conf).listFiles(fromFolder, true)
var files = Seq[String]()
while (logfiles.hasNext) {
       // one can filter here some specific files
       files = files :+ logfiles.next().getPath().toString
}

// read multiple paths
val df = spark.read.csv(files: _*)

df.select(input_file_name()).distinct().show(false)


+-------------------------------------+
|input_file_name()                    |
+-------------------------------------+
|file:///var/foo/try/11thOct/log2.csv |
|file:///var/foo/try/10thOct_logs1.csv|
|file:///var/foo/try/Oct/12th/log3.csv|
+-------------------------------------+
Run Code Online (Sandbox Code Playgroud)


dre*_*-hh 5

不幸的是,hadoop glob 不支持递归 glob。请参阅查询文件系统#文件模式

有一个选项可以为每个目录级别列出多个全局变量。

{a,b} 交替 匹配表达式 a 或 b

您必须小心不要将同一文件匹配两次,否则它将显示为重复。

spark.read.json("./try/{*logs*,*/*logs*,*/*/*logs*}")
Run Code Online (Sandbox Code Playgroud)

您还可以加载多个数据帧并将它们合并

val dfs = List(
  spark.read.json("./try/*logs*"),
  spark.read.json("./try/*/*logs*"),
  spark.read.json("./try/*/*/*logs*")
)
val df = dfs.reduce(_ union _)

Run Code Online (Sandbox Code Playgroud)