如何通过Spark打开/流式传输.zip文件?

Jef*_*fLL 7 hadoop apache-spark

我有zip文件,我想打开'通过'Spark.我可以打开.gzip文件没有问题,因为Hadoops本机编解码器支持,但无法使用.zip文件.

有没有一种简单的方法来读取Spark代码中的zip文件?我还搜索了要添加到CompressionCodecFactory的zip编解码器实现,但到目前为止还没有成功.

Tri*_*ima 17

没有python代码的解决方案,我最近不得不在pyspark中读取拉链.而且,在搜索如何做到这一点时,我遇到了这个问题.所以,希望这会帮助别人.

import zipfile
import io

def zip_extract(x):
    in_memory_data = io.BytesIO(x[1])
    file_obj = zipfile.ZipFile(in_memory_data, "r")
    files = [i for i in file_obj.namelist()]
    return dict(zip(files, [file_obj.open(file).read() for file in files]))


zips = sc.binaryFiles("hdfs:/Testing/*.zip")
files_data = zips.map(zip_extract).collect()
Run Code Online (Sandbox Code Playgroud)

在上面的代码中,我返回了一个字典,其中zip作为键的文件名,每个文件中的文本数据作为值.你可以改变它,但你想要适合你的目的.


Jef*_*fLL 6

@user3591785 为我指明了正确的方向,所以我将他的答案标记为正确。

有关更多详细信息,我能够搜索 ZipFileInputFormat Hadoop,并发现了此链接:http://cotdp.com/2012/07/hadoop-processing-zip-files-in-mapreduce/

利用 ZipFileInputFormat 及其助手 ZipfileRecordReader 类,我能够让 Spark 完美地打开和读取 zip 文件。

    rdd1  = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration());
Run Code Online (Sandbox Code Playgroud)

结果是一张只有一个元素的地图。文件名作为键,内容作为值,所以我需要将其转换为 JavaPairRdd。我确信如果您愿意,您可以将 Text 替换为 BytesWritable,并将 ArrayList 替换为其他内容,但我的目标是首先让某些东西运行起来。

JavaPairRDD<String, String> rdd2 = rdd1.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, String, String>() {

    @Override
    public Iterable<Tuple2<String, String>> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
        List<Tuple2<String,String>> newList = new ArrayList<Tuple2<String, String>>();

        InputStream is = new ByteArrayInputStream(textTextTuple2._2.getBytes());
        BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));

        String line;

        while ((line = br.readLine()) != null) {

        Tuple2 newTuple = new Tuple2(line.split("\\t")[0],line);
            newList.add(newTuple);
        }
        return newList;
    }
});
Run Code Online (Sandbox Code Playgroud)


Tin*_*nku 5

请尝试以下代码:

using API sparkContext.newAPIHadoopRDD(
    hadoopConf,
    InputFormat.class,
    ImmutableBytesWritable.class, Result.class)
Run Code Online (Sandbox Code Playgroud)

  • 谢谢,但是可以提供一个示例用例吗? (5认同)