Spark流式传输DStream RDD以获取文件名

Vij*_*uri 9 scala apache-spark

Spark流式传输textFileStream,fileStream可以监控目录并处理Dstream RDD中的新文件.

如何在特定时间间隔内获取DStream RDD正在处理的文件名?

non*_*epr 5

fileStream产生UnionRDDNewHadoopRDD秒。NewHadoopRDD由创建的s 的优点sc.newAPIHadoopFile在于,将names设置为其路径。

这是您可以利用这些知识做什么的示例:

def namedTextFileStream(ssc: StreamingContext, directory: String): DStream[String] =
  ssc.fileStream[LongWritable, Text, TextInputFormat](directory)
    .transform( rdd =>
      new UnionRDD(rdd.context,
        rdd.dependencies.map( dep =>
          dep.rdd.asInstanceOf[RDD[(LongWritable, Text)]].map(_._2.toString).setName(dep.rdd.name)
        )
      )
    )

def transformByFile[U: ClassTag](unionrdd: RDD[String],
                                 transformFunc: String => RDD[String] => RDD[U]): RDD[U] = {
  new UnionRDD(unionrdd.context,
    unionrdd.dependencies.map{ dep =>
      if (dep.rdd.isEmpty) None
      else {
        val filename = dep.rdd.name
        Some(
          transformFunc(filename)(dep.rdd.asInstanceOf[RDD[String]])
            .setName(filename)
        )
      }
    }.flatten
  )
}

def main(args: Array[String]) = {
  val conf = new SparkConf()
    .setAppName("Process by file")
    .setMaster("local[2]")

  val ssc = new StreamingContext(conf, Seconds(30))

  val dstream = namesTextFileStream(ssc, "/some/directory")

  def byFileTransformer(filename: String)(rdd: RDD[String]): RDD[(String, String)] =
    rdd.map(line => (filename, line))

  val transformed = dstream.
    transform(rdd => transformByFile(rdd, byFileTransformer))

  // Do some stuff with transformed

  ssc.start()
  ssc.awaitTermination()
}
Run Code Online (Sandbox Code Playgroud)