如何使用`ssc.fileStream()`读取镶木地板文件?传递给`ssc.fileStream()`的类型是什么?

Ror*_*rne 6 hadoop scala apache-spark hadoop2 spark-streaming

我的星火公司的理解fileStream()方法是,它需要三种类型作为参数:Key,Value,和Format.在文本文件的情况下,适当的类型有:LongWritable,Text,和TextInputFormat.

首先,我想了解这些类型的性质.直觉上,我猜想Key在这种情况下,文件的行号是该行Value的文本.因此,在以下文本文件示例中:

Hello
Test
Another Test
Run Code Online (Sandbox Code Playgroud)

的第一行DStream会拥有Key1(0?)和ValueHello.

它是否正确?


我的问题的第二部分:我看了反编译的实现,ParquetInputFormat我发现了一些好奇的东西:

public class ParquetInputFormat<T>
       extends FileInputFormat<Void, T> {
//...

public class TextInputFormat
       extends FileInputFormat<LongWritable, Text>
       implements JobConfigurable {
//...
Run Code Online (Sandbox Code Playgroud)

TextInputFormat延伸FileInputFormat的类型LongWritableText,而ParquetInputFormat扩展了同一类的类型VoidT.

这是否意味着我必须创建一个Value类来保存我的镶木地板数据的整行,然后将类型传递<Void, MyClass, ParquetInputFormat<MyClass>>ssc.fileStream()

如果是这样,我应该如何实施MyClass


编辑1:我注意到readSupportClass要将其传递给ParquetInputFormat对象.这是什么类,它是如何用于解析镶木地板文件的?是否有一些文件涵盖了这一点?


编辑2:据我所知,这是不可能的.如果有人知道如何将镶木地板文件流式传输到Spark,请随时分享...

tab*_*ata 6

我在Spark Streaming中读取镶木地板文件的示例如下.

val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "parquet.avro.AvroReadSupport")
val stream = ssc.fileStream[Void, GenericRecord, ParquetInputFormat[GenericRecord]](
  directory, { path: Path => path.toString.endsWith("parquet") }, true, ssc.sparkContext.hadoopConfiguration)

val lines = stream.map(row => {
  println("row:" + row.toString())
  row
})
Run Code Online (Sandbox Code Playgroud)

有些要点......

  • 记录类型是GenericRecord
  • readSupportClass是AvroReadSupport
  • 将配置传递给fileStream
  • 将parquet.read.support.class设置为Configuration

我在下面提到了创建样本的源代码.
我也找不到好的例子.
我想等一个好.

https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
https://github.com/Parquet/parquet-mr/blob /master/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/流/ DSTREAM/FileInputDStream.scala