Cod*_*ome 6 scala apache-spark spark-streaming dstream
我想将DStream转换为数组,列表等,然后我可以将其转换为json并在端点上提供它.我正在使用apache spark,注入twitter数据.如何在Dstream上执行此操作statuses
?除了,我似乎无法获得任何工作print()
.
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import TutorialHelper._
object Tutorial {
def main(args: Array[String]) {
// Location of the Spark directory
val sparkHome = "/opt/spark"
// URL of the Spark cluster
val sparkUrl = "local[8]"
// Location of the required JAR files
val jarFile = "target/scala-2.10/tutorial_2.10-0.1-SNAPSHOT.jar"
// HDFS directory for checkpointing
val checkpointDir = "/tmp"
// Configure Twitter credentials using twitter.txt
TutorialHelper.configureTwitterCredentials()
val ssc = new StreamingContext(sparkUrl, "Tutorial", Seconds(1), sparkHome, Seq(jarFile))
val filters = Array("#americasgottalent", "iamawesome")
val tweets = TwitterUtils.createStream(ssc, None, filters)
val statuses = tweets.map(status => status.getText())
val arry = Array("firstval")
statuses.foreachRDD {
arr :+ _.collect()
}
ssc.checkpoint(checkpointDir)
ssc.start()
ssc.awaitTermination()
}
}
Run Code Online (Sandbox Code Playgroud)
aar*_*man 10
如果您的RDD是statuses
可以的.
val arr = new ArrayBuffer[String]();
statuses.foreachRDD {
arr ++= _.collect() //you can now put it in an array or d w/e you want with it
...
}
Run Code Online (Sandbox Code Playgroud)
请记住,由于DStream可能很大,因此最终可能会比您在驱动程序中拥有的数据更多.
轮到我们你很接近,但我最终要寻找的是。
statuses.foreachRDD( rdd => {
for(item <- rdd.collect().toArray) {
println(item);
}
})
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
19197 次 |
最近记录: |