lon*_*tar 5 java scala 7zip hdfs apache-spark
我正在尝试使用 scala 或 java 读取 Spark .7z 文件。我没有找到任何合适的方法或功能。
对于 zip 文件,我能够读取,因为 ZipInputStream 类采用输入流,但对于 7Z 文件,SevenZFile 类不采用任何输入流。 https://commons.apache.org/proper/commons-compress/javadocs/api-1.16/org/apache/commons/compress/archivers/sevenz/SevenZFile.html
压缩文件代码
spark.sparkContext.binaryFiles("fileName").flatMap{case (name: String, content: PortableDataStream) =>
val zis = new ZipInputStream(content.open)
Stream.continually(zis.getNextEntry)
.takeWhile(_ != null)
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}}
Run Code Online (Sandbox Code Playgroud)
我正在尝试类似的 7z 文件代码
spark.sparkContext.binaryFiles(""filename"").flatMap{case (name: String, content: PortableDataStream) =>
val zis = new SevenZFile(content.open)
Stream.continually(zis.getNextEntry)
.takeWhile(_ != null)
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}}
Run Code Online (Sandbox Code Playgroud)
但 SevenZFile 不接受这些格式。寻找想法。
如果文件位于本地文件系统中,则以下解决方案有效,但我的文件位于 hdfs 中
本地文件系统代码
public static void decompress(String in, File destination) throws IOException {
SevenZFile sevenZFile = new SevenZFile(new File(in));
SevenZArchiveEntry entry;
while ((entry = sevenZFile.getNextEntry()) != null){
if (entry.isDirectory()){
continue;
}
File curfile = new File(destination, entry.getName());
File parent = curfile.getParentFile();
if (!parent.exists()) {
parent.mkdirs();
}
FileOutputStream out = new FileOutputStream(curfile);
byte[] content = new byte[(int) entry.getSize()];
sevenZFile.read(content, 0, content.length);
out.write(content);
out.close();
}
}
Run Code Online (Sandbox Code Playgroud)
经过这么多年的 Spark 演变,应该有一种简单的方法可以做到这一点。
java.io.File您可以尝试SeekableByteChannel此替代构造函数中所示的方法,而不是使用基于 - 的方法。
您可以使用SeekableInMemoryByteChannel来读取字节数组。因此,只要您可以从 S3 或其他地方获取 7zip 文件并将它们作为字节数组传递,就应该没问题。
综上所述,Spark 确实不太适合处理 zip 和 7zip 文件等内容。我可以从个人经验告诉你,一旦文件太大而 Spark 的执行器无法处理,它就会严重失败。
像 Apache NiFi 这样的东西可以更好地扩展档案并处理它们。FWIW,我目前正在处理一个大型数据转储,这让我经常处理其中包含数百万个文件的 50GB tarball,NiFi 非常优雅地处理它们。