use*_*829 10 hadoop scala apache-spark spark-streaming
我使用fileStream从Spark(流上下文)读取hdfs目录中的文件.如果我的Spark关闭并在一段时间后启动,我想读取目录中的新文件.我不想读取已经由Spark读取和处理的目录中的旧文件.我想在这里避免重复.
val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/home/File")
Run Code Online (Sandbox Code Playgroud)
任何代码片段都有帮助吗?
Ish*_*mar 20
您可以使用FileSystem API.以下是命令.
import org.apache.hadoop.fs.{FileSystem, Path}
val fs = FileSystem.get(sc.hadoopConfiguration)
val outPutPath = new Path("/abc")
if (fs.exists(outPutPath))
fs.delete(outPutPath, true)
Run Code Online (Sandbox Code Playgroud)