sop*_*hie 3 hadoop hdfs apache-spark spark-streaming
如果由于某种原因(例如演示,部署)我的Spark作业停止了,但写入/移动到HDFS目录是连续的,我可能会在启动Spark Streaming Job后跳过这些文件.
val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/logs")
hdfsDStream.foreachRDD(
rdd => logInfo("Number of records in this batch: " + rdd.count())
)
Run Code Online (Sandbox Code Playgroud)
输出 - >此批次中的记录数:0
Spark Streaming有没有办法将"读取"文件移动到另一个文件夹?或者我们必须手动编程?因此,它将避免读取已经"读取"的文件.
Spark Streaming与在CRON中运行spark job(sc.textFile)相同吗?
正如Dean所提到的,textFileStream使用的默认值仅使用新文件.
def textFileStream(directory: String): DStream[String] = {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
Run Code Online (Sandbox Code Playgroud)
所以,它正在做的就是调用这个变体 fileStream
def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory)
}
Run Code Online (Sandbox Code Playgroud)
并且,查看FileInputDStream
该类,我们将看到它确实可以查找现有文件,但默认为仅新:
newFilesOnly: Boolean = true,
Run Code Online (Sandbox Code Playgroud)
所以,回到StreamingContext
代码中,我们可以看到通过直接调用fileStream
方法可以使用和重载:
def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag]
(directory: String, filter: Path => Boolean, newFilesOnly: Boolean):InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
}
Run Code Online (Sandbox Code Playgroud)
ssc.fileStream[LongWritable, Text, TextInputFormat]
(directory, FileInputDStream.defaultFilter, false).map(_._2.toString)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
4642 次 |
最近记录: |