Spark 读取 .7z 文件

lon*_*tar 5 java scala 7zip hdfs apache-spark

我正在尝试使用 scala 或 java 读取 Spark .7z 文件。我没有找到任何合适的方法或功能。

对于 zip 文件,我能够读取,因为 ZipInputStream 类采用输入流,但对于 7Z 文件,SevenZFile 类不采用任何输入流。 https://commons.apache.org/proper/commons-compress/javadocs/api-1.16/org/apache/commons/compress/archivers/sevenz/SevenZFile.html

压缩文件代码

spark.sparkContext.binaryFiles("fileName").flatMap{case (name: String, content: PortableDataStream) =>
        val zis = new ZipInputStream(content.open)
        Stream.continually(zis.getNextEntry)
              .takeWhile(_ != null)
              .flatMap { _ =>
                  val br = new BufferedReader(new InputStreamReader(zis))
                  Stream.continually(br.readLine()).takeWhile(_ != null)
              }}
Run Code Online (Sandbox Code Playgroud)

我正在尝试类似的 7z 文件代码

spark.sparkContext.binaryFiles(""filename"").flatMap{case (name: String, content: PortableDataStream) =>
        val zis = new SevenZFile(content.open)
        Stream.continually(zis.getNextEntry)
              .takeWhile(_ != null)
              .flatMap { _ =>
                  val br = new BufferedReader(new InputStreamReader(zis))
                  Stream.continually(br.readLine()).takeWhile(_ != null)
              }}
Run Code Online (Sandbox Code Playgroud)

但 SevenZFile 不接受这些格式。寻找想法。

如果文件位于本地文件系统中,则以下解决方案有效,但我的文件位于 hdfs 中

本地文件系统代码

 public static void decompress(String in, File destination) throws IOException {
        SevenZFile sevenZFile = new SevenZFile(new File(in));
        SevenZArchiveEntry entry;
        while ((entry = sevenZFile.getNextEntry()) != null){
            if (entry.isDirectory()){
                continue;
            }
            File curfile = new File(destination, entry.getName());
            File parent = curfile.getParentFile();
            if (!parent.exists()) {
                parent.mkdirs();
            }
            FileOutputStream out = new FileOutputStream(curfile);
            byte[] content = new byte[(int) entry.getSize()];
            sevenZFile.read(content, 0, content.length);
            out.write(content);
            out.close();
        }
    }
Run Code Online (Sandbox Code Playgroud)

经过这么多年的 Spark 演变,应该有一种简单的方法可以做到这一点。

Mik*_*sen 4

java.io.File您可以尝试SeekableByteChannel替代构造函数中所示的方法,而不是使用基于 - 的方法。

您可以使用SeekableInMemoryByteChannel来读取字节数组。因此,只要您可以从 S3 或其他地方获取 7zip 文件并将它们作为字节数组传递,就应该没问题。

综上所述,Spark 确实不太适合处理 zip 和 7zip 文件等内容。我可以从个人经验告诉你,一旦文件太大而 Spark 的执行器无法处理,它就会严重失败。

像 Apache NiFi 这样的东西可以更好地扩展档案并处理它们。FWIW,我目前正在处理一个大型数据转储,这让我经常处理其中包含数百万个文件的 50GB tarball,NiFi 非常优雅地处理它们。