如何使用wholeTextFiles读取Spark中的gz文件

Yan*_*eld 6 gzip hadoop apache-spark

我有一个包含许多小.gz文件的文件夹(压缩的csv文本文件).我需要在我的Spark工作中阅读它们,但问题是我需要根据文件名中的信息进行一些处理.因此,我没有使用:

JavaRDD<<String>String> input = sc.textFile(...)
Run Code Online (Sandbox Code Playgroud)

因为据我所知,我无法以这种方式访问​​文件名.相反,我用过:

JavaPairRDD<<String>String,String> files_and_content = sc.wholeTextFiles(...);
Run Code Online (Sandbox Code Playgroud)

因为这样我得到了一对文件名和内容.但是,似乎这样,输入阅读器无法从gz文件中读取文本,而是读取二进制Gibberish.

所以,我想知道我是否可以将其设置为以某种方式读取文本,或者使用以下方式访问文件名 sc.textFile(...)

aar*_*man 2

您无法使用wholeTextFiles读取gzip压缩文件,因为它使用CombineFileInputFormat,它无法读取gzip压缩文件,因为它们不可分割(来源证明了这一点):

  override def createRecordReader(
      split: InputSplit,
      context: TaskAttemptContext): RecordReader[String, String] = {

    new CombineFileRecordReader[String, String](
      split.asInstanceOf[CombineFileSplit],
      context,
      classOf[WholeTextFileRecordReader])
  }
Run Code Online (Sandbox Code Playgroud)

您也许可以使用newAPIHadoopFilewith wholefileinputformat(不是内置于 hadoop 中,而是在互联网上)来使其正常工作。

更新 1:我认为 WholeFileInputFormat 不起作用,因为它只是获取文件的字节,这意味着您可能必须编写自己的类,可能扩展 WholeFileInputFormat 以确保它解压缩字节。

另一种选择是使用GZipInputStream自己解压缩字节

更新2:如果您有权访问目录名称(如下面OP 的评论中所示),您可以获得这样的所有文件。

Path path = new Path("");
FileSystem fileSystem = path.getFileSystem(new Configuration()); //just uses the default one
FileStatus []  fileStatuses = fileSystem.listStatus(path);
ArrayList<Path> paths = new ArrayList<>();
for (FileStatus fileStatus : fileStatuses) paths.add(fileStatus.getPath());
Run Code Online (Sandbox Code Playgroud)