Hadoop序列文件真的很奇怪.我将图像打包成序列文件,无法恢复图像.我做了一些简单的测试.我发现在使用序列文件之前和之后字节的大小甚至不相同.
Configuration confHadoop = new Configuration();
FileSystem fs = FileSystem.get(confHadoop);
String fileName = args[0];
Path file = new Path(fs.getUri().toString() + "/" + fileName);
Path seqFile = new Path("/temp.seq");
SequenceFile.Writer writer = null;
FSDataInputStream in = null;
try{
writer = SequenceFile.createWriter(confHadoop,Writer.file(seqFile), Writer.keyClass(Text.class),
Writer.valueClass(BytesWritable.class));
in = fs.open(file);
byte buffer[] = IOUtils.toByteArray(in);
System.out.println("original size ----> " + String.valueOf(buffer.length));
writer.append(new Text(fileName), new BytesWritable(buffer));
System.out.println(calculateMd5(buffer));
writer.close();
}finally{
IOUtils.closeQuietly(in);
}
SequenceFile.Reader reader = new SequenceFile.Reader(confHadoop, Reader.file(seqFile));
Text key = new Text();
BytesWritable val = new BytesWritable(); …Run Code Online (Sandbox Code Playgroud) 我在 HDFS 上的数据是序列文件格式。我正在使用 PySpark(Spark 1.6)并试图实现两件事:
数据路径包含 yyyy/mm/dd/hh 格式的时间戳,我想将其引入数据本身。我试过 SparkContext.wholeTextFiles 但我认为它可能不支持 Sequence 文件格式。
如果我想处理一天的数据并想将日期带入数据中,我该如何处理上述问题?在这种情况下,我将加载 yyyy/mm/dd/* 格式的数据。
感谢任何指针。
我正在运行一个地图缩减程序来读取 HDFS 文件,如下所示:
hadoop jar /opt/mapr/hadoop/hadoop-0.20.2/contrib/streaming/hadoop-0.20.2-dev-streaming.jar -Dmapred.reduce.tasks=1000 -file $homedir/mapper.py -mapper $homedir/mapper.py -file $homedir/reducer.py -reducer $homedir/reducer.py -input /user/data/* -output /output/ 2> output.text
Run Code Online (Sandbox Code Playgroud)
有什么需要确认的,路径 /user/data/* 包含包含文件的文件夹, /user/data/* 将迭代所有子文件夹下的所有文件,对吗?
hdfs 文本文件的每一行都包含一个 JSON 字符串,因此映射器读取该文件如下:
for line in sys.stdin:
try:
object = json.loads(line)
Run Code Online (Sandbox Code Playgroud)
但 HDFS 的所有者将文件从文本更改为序列文件。我发现mapreduce程序输出了很多零大小的文件,这可能意味着它没有成功从HDFS读取文件。
我应该对代码进行哪些更改才能从序列文件中读取内容?我还有一个 HIVE 外部表来根据 mapreduce 的输出执行聚合和排序,并且 HIVE 之前是 STORED AS TEXTFILE ,我应该更改为 STORED AS SEQUENCEFILE 吗?
谢谢,
我能够运行此脚本以文本格式保存文件,但是当我尝试运行saveAsSequenceFile时,它出错了.如果有人知道如何将RDD保存为序列文件,请告诉我这个过程.我尝试在"学习Spark"以及官方Spark文档中寻找解决方案.
这成功运行
dataRDD = sc.textFile("/user/cloudera/sqoop_import/departments")
dataRDD.saveAsTextFile("/user/cloudera/pyspark/departments")
Run Code Online (Sandbox Code Playgroud)
这失败了
dataRDD = sc.textFile("/user/cloudera/sqoop_import/departments")
dataRDD.saveAsSequenceFile("/user/cloudera/pyspark/departmentsSeq")
Run Code Online (Sandbox Code Playgroud)
错误:调用z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile时发生错误.:org.apache.spark.SparkException:无法使用java.lang.String类型的RDD元素
这是数据:
2,Fitness
3,Footwear
4,Apparel
5,Golf
6,Outdoors
7,Fan Shop
8,TESTING
8000,TESTING
Run Code Online (Sandbox Code Playgroud) 我有一个序列文件,其值如下
(string_value, json_value)
Run Code Online (Sandbox Code Playgroud)
我不关心字符串值.
在Scala我可以通过阅读文件
val reader = sc.sequenceFile[String, String]("/path...")
val data = reader.map{case (x, y) => (y.toString)}
val jsondata = spark.read.json(data)
Run Code Online (Sandbox Code Playgroud)
我很难将其转换为PySpark.我试过用
reader= sc.sequenceFile("/path","org.apache.hadoop.io.Text", "org.apache.hadoop.io.Text")
data = reader.map(lambda x,y: str(y))
jsondata = spark.read.json(data)
Run Code Online (Sandbox Code Playgroud)
这些错误很神秘,但如果有帮助我可以提供.我的问题是,在pySpark2中读取这些序列文件的正确语法是什么?
我想我没有正确地将数组元素转换为字符串.如果我做一些简单的事情,我会得到类似的错误
m = sc.parallelize([(1, 2), (3, 4)])
m.map(lambda x,y: y.toString).collect()
Run Code Online (Sandbox Code Playgroud)
要么
m = sc.parallelize([(1, 2), (3, 4)])
m.map(lambda x,y: str(y)).collect()
Run Code Online (Sandbox Code Playgroud)
谢谢!
sequencefile ×5
apache-spark ×3
pyspark ×3
hadoop ×2
python ×2
bytearray ×1
compression ×1
hive ×1
mapreduce ×1