Spark Streaming:HDFS

sop*_*hie 3 hadoop hdfs apache-spark spark-streaming

  1. 我无法让我的Spark工作从HDFS流式传输"旧"文件.

如果由于某种原因(例如演示,部署)我的Spark作业停止了,但写入/移动到HDFS目录是连续的,我可能会在启动Spark Streaming Job后跳过这些文件.

    val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/logs")

    hdfsDStream.foreachRDD(
      rdd => logInfo("Number of records in this batch: " + rdd.count())
    )
Run Code Online (Sandbox Code Playgroud)

输出 - >此批次中的记录数:0

  1. Spark Streaming有没有办法将"读取"文件移动到另一个文件夹?或者我们必须手动编程?因此,它将避免读取已经"读取"的文件.

  2. Spark Streaming与在CRON中运行spark job(sc.textFile)相同吗?

Jus*_*ony 7

正如Dean所提到的,textFileStream使用的默认值仅使用新文件.

  def textFileStream(directory: String): DStream[String] = {
    fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
  }
Run Code Online (Sandbox Code Playgroud)

所以,它正在做的就是调用这个变体 fileStream

def fileStream[
    K: ClassTag,
    V: ClassTag,
    F <: NewInputFormat[K, V]: ClassTag
  ] (directory: String): InputDStream[(K, V)] = {
    new FileInputDStream[K, V, F](this, directory)
  }
Run Code Online (Sandbox Code Playgroud)

并且,查看FileInputDStream该类,我们将看到它确实可以查找现有文件,但默认为仅新:

newFilesOnly: Boolean = true,
Run Code Online (Sandbox Code Playgroud)

所以,回到StreamingContext代码中,我们可以看到通过直接调用fileStream方法可以使用和重载:

def fileStream[
 K: ClassTag,
 V: ClassTag,
 F <: NewInputFormat[K, V]: ClassTag] 
(directory: String, filter: Path => Boolean, newFilesOnly: Boolean):InputDStream[(K, V)] = {
  new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
}
Run Code Online (Sandbox Code Playgroud)

所以,TL; DR; 是

ssc.fileStream[LongWritable, Text, TextInputFormat]
    (directory, FileInputDStream.defaultFilter, false).map(_._2.toString)
Run Code Online (Sandbox Code Playgroud)