Ror*_*rne 6 hadoop scala apache-spark hadoop2 spark-streaming
我的星火公司的理解fileStream()方法是,它需要三种类型作为参数:Key,Value,和Format.在文本文件的情况下,适当的类型有:LongWritable,Text,和TextInputFormat.
首先,我想了解这些类型的性质.直觉上,我猜想Key在这种情况下,文件的行号是该行Value的文本.因此,在以下文本文件示例中:
Hello
Test
Another Test
Run Code Online (Sandbox Code Playgroud)
的第一行DStream会拥有Key的1(0?)和Value的Hello.
它是否正确?
我的问题的第二部分:我看了反编译的实现,ParquetInputFormat我发现了一些好奇的东西:
public class ParquetInputFormat<T>
extends FileInputFormat<Void, T> {
//...
public class TextInputFormat
extends FileInputFormat<LongWritable, Text>
implements JobConfigurable {
//...
Run Code Online (Sandbox Code Playgroud)
TextInputFormat延伸FileInputFormat的类型LongWritable和Text,而ParquetInputFormat扩展了同一类的类型Void和T.
这是否意味着我必须创建一个Value类来保存我的镶木地板数据的整行,然后将类型传递<Void, MyClass, ParquetInputFormat<MyClass>>给ssc.fileStream()?
如果是这样,我应该如何实施MyClass?
编辑1:我注意到readSupportClass要将其传递给ParquetInputFormat对象.这是什么类,它是如何用于解析镶木地板文件的?是否有一些文件涵盖了这一点?
编辑2:据我所知,这是不可能的.如果有人知道如何将镶木地板文件流式传输到Spark,请随时分享...
我在Spark Streaming中读取镶木地板文件的示例如下.
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "parquet.avro.AvroReadSupport")
val stream = ssc.fileStream[Void, GenericRecord, ParquetInputFormat[GenericRecord]](
directory, { path: Path => path.toString.endsWith("parquet") }, true, ssc.sparkContext.hadoopConfiguration)
val lines = stream.map(row => {
println("row:" + row.toString())
row
})
Run Code Online (Sandbox Code Playgroud)
有些要点......
我在下面提到了创建样本的源代码.
我也找不到好的例子.
我想等一个好.
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
https://github.com/Parquet/parquet-mr/blob /master/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/流/ DSTREAM/FileInputDStream.scala
| 归档时间: |
|
| 查看次数: |
3038 次 |
| 最近记录: |