在火花流式上下文中将RDD写入HDFS

drs*_*ein 7 hadoop scala hdfs apache-spark spark-streaming

我有一个Spark 1.2.0火花流环境,我从本地文件夹中检索数据,每当我发现一个新文件添加到文件夹时,我执行一些转换.

val ssc = new StreamingContext(sc, Seconds(10))
val data = ssc.textFileStream(directory)
Run Code Online (Sandbox Code Playgroud)

为了对DStream数据执行分析,我必须将其转换为数组

var arr = new ArrayBuffer[String]();
   data.foreachRDD {
   arr ++= _.collect()
}
Run Code Online (Sandbox Code Playgroud)

然后我使用获得的数据来提取我想要的信息并将它们保存在HDFS上.

val myRDD  = sc.parallelize(arr)
myRDD.saveAsTextFile("hdfs directory....")
Run Code Online (Sandbox Code Playgroud)

由于我真的需要用数据来操作数据,所以不可能在HDFS上保存数据DStream.saveAsTextFiles("...")(这可以正常工作)而且我必须保存RDD但是这个预处理后我终于有了名为part-00000等的空输出文件......

通过arr.foreach(println)我能够看到转换的正确结果.

我的怀疑是,spark会尝试在每个批处理中将数据写入相同的文件,删除以前写的内容.我试图保存在动态命名文件夹中,myRDD.saveAsTextFile("folder" + System.currentTimeMillis().toString())但始终只创建一个folds并且输出文件仍为空.

如何在Spark-streaming上下文中将RDD写入HDFS?

Mar*_*ier 7

您正在以未设计的方式使用Spark Streaming.我建议您使用Spark代替您的用例,或者调整您的代码,使其以Spark方式工作.将阵列收集到驱动程序会破坏使用分布式引擎的目的,并使您的应用程序有效地实现单机(两台机器也会比仅在一台机器上处理数据产生更多的开销).

你可以用数组做的一切,你可以用Spark做.因此,只需在流中运行计算,在工作程序上分发,然后使用输出编写输出DStream.saveAsTextFiles().您可以使用foreachRDD+ saveAsParquet(path, overwrite = true)来写入单个文件.