标签: sequencefile

如何从Hadoop序列文件中提取数据?

Hadoop序列文件真的很奇怪.我将图像打包成序列文件,无法恢复图像.我做了一些简单的测试.我发现在使用序列文件之前和之后字节的大小甚至不相同.

Configuration confHadoop = new Configuration();
        FileSystem fs = FileSystem.get(confHadoop);

        String fileName = args[0];
        Path file = new Path(fs.getUri().toString() + "/" + fileName);
        Path seqFile = new Path("/temp.seq");
        SequenceFile.Writer writer = null;
        FSDataInputStream in = null;
        try{
            writer = SequenceFile.createWriter(confHadoop,Writer.file(seqFile), Writer.keyClass(Text.class),
                    Writer.valueClass(BytesWritable.class));

            in = fs.open(file);
            byte buffer[] = IOUtils.toByteArray(in);


            System.out.println("original size ---->  " + String.valueOf(buffer.length));
            writer.append(new Text(fileName), new BytesWritable(buffer));
            System.out.println(calculateMd5(buffer));
            writer.close();

        }finally{
            IOUtils.closeQuietly(in);
        }

        SequenceFile.Reader reader = new SequenceFile.Reader(confHadoop, Reader.file(seqFile));

        Text key = new Text();
        BytesWritable val = new BytesWritable(); …
Run Code Online (Sandbox Code Playgroud)

compression hadoop bytearray sequencefile

2
推荐指数
1
解决办法
2259
查看次数

在 PySpark 中获取序列文件格式的文件的 HDFS 文件路径

我在 HDFS 上的数据是序列文件格式。我正在使用 PySpark(Spark 1.6)并试图实现两件事:

  1. 数据路径包含 yyyy/mm/dd/hh 格式的时间戳,我想将其引入数据本身。我试过 SparkContext.wholeTextFiles 但我认为它可能不支持 Sequence 文件格式。

  2. 如果我想处理一天的数据并想将日期带入数据中,我该如何处理上述问题?在这种情况下,我将加载 yyyy/mm/dd/* 格式的数据。

感谢任何指针。

sequencefile apache-spark pyspark

2
推荐指数
1
解决办法
3995
查看次数

如何在Python中从HDFS序列文件加载数据

我正在运行一个地图缩减程序来读取 HDFS 文件,如下所示:

hadoop jar /opt/mapr/hadoop/hadoop-0.20.2/contrib/streaming/hadoop-0.20.2-dev-streaming.jar -Dmapred.reduce.tasks=1000  -file $homedir/mapper.py -mapper $homedir/mapper.py -file $homedir/reducer.py -reducer $homedir/reducer.py   -input /user/data/* -output /output/ 2> output.text
Run Code Online (Sandbox Code Playgroud)

有什么需要确认的,路径 /user/data/* 包含包含文件的文件夹, /user/data/* 将迭代所有子文件夹下的所有文件,对吗?

hdfs 文本文件的每一行都包含一个 JSON 字符串,因此映射器读取该文件如下:

for line in sys.stdin:
    try:
        object = json.loads(line)
Run Code Online (Sandbox Code Playgroud)

但 HDFS 的所有者将文件从文本更改为序列文件。我发现mapreduce程序输出了很多零大小的文件,这可能意味着它没有成功从HDFS读取文件。

我应该对代码进行哪些更改才能从序列文件中读取内容?我还有一个 HIVE 外部表来根据 mapreduce 的输出执行聚合和排序,并且 HIVE 之前是 STORED AS TEXTFILE ,我应该更改为 STORED AS SEQUENCEFILE 吗?

谢谢,

python hadoop hive mapreduce sequencefile

1
推荐指数
1
解决办法
9019
查看次数

将RDD保存为pyspark中的序列文件

我能够运行此脚本以文本格式保存文件,但是当我尝试运行saveAsSequenceFile时,它出错了.如果有人知道如何将RDD保存为序列文件,请告诉我这个过程.我尝试在"学习Spark"以及官方Spark文档中寻找解决方案.

这成功运行

dataRDD = sc.textFile("/user/cloudera/sqoop_import/departments")
dataRDD.saveAsTextFile("/user/cloudera/pyspark/departments")
Run Code Online (Sandbox Code Playgroud)

这失败了

dataRDD = sc.textFile("/user/cloudera/sqoop_import/departments")
dataRDD.saveAsSequenceFile("/user/cloudera/pyspark/departmentsSeq")
Run Code Online (Sandbox Code Playgroud)

错误:调用z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile时发生错误.:org.apache.spark.SparkException:无法使用java.lang.String类型的RDD元素

这是数据:

2,Fitness
3,Footwear
4,Apparel
5,Golf
6,Outdoors
7,Fan Shop
8,TESTING
8000,TESTING
Run Code Online (Sandbox Code Playgroud)

python sequencefile apache-spark pyspark

1
推荐指数
1
解决办法
7653
查看次数

在PySpark 2.0中读取序列文件

我有一个序列文件,其值如下

(string_value, json_value)
Run Code Online (Sandbox Code Playgroud)

我不关心字符串值.

在Scala我可以通过阅读文件

val reader = sc.sequenceFile[String, String]("/path...")
val data = reader.map{case (x, y) => (y.toString)}
val jsondata = spark.read.json(data)
Run Code Online (Sandbox Code Playgroud)

我很难将其转换为PySpark.我试过用

reader= sc.sequenceFile("/path","org.apache.hadoop.io.Text", "org.apache.hadoop.io.Text")
data = reader.map(lambda x,y: str(y))
jsondata = spark.read.json(data)
Run Code Online (Sandbox Code Playgroud)

这些错误很神秘,但如果有帮助我可以提供.我的问题是,在pySpark2中读取这些序列文件的正确语法是什么?

我想我没有正确地将数组元素转换为字符串.如果我做一些简单的事情,我会得到类似的错误

m = sc.parallelize([(1, 2), (3, 4)])
m.map(lambda x,y: y.toString).collect()
Run Code Online (Sandbox Code Playgroud)

要么

m = sc.parallelize([(1, 2), (3, 4)])
m.map(lambda x,y: str(y)).collect()
Run Code Online (Sandbox Code Playgroud)

谢谢!

sequencefile apache-spark pyspark

1
推荐指数
1
解决办法
4948
查看次数