这是一个用scala编写的火花流程序.它计算每1秒钟来自套接字的字数.结果将是单词count,例如,从0到1的单词计数,以及从1到2的单词计数.但是我想知道是否有某种方法可以改变这个程序以便我们可以累积字数?也就是说,字数从时间0到现在为止.
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
Run Code Online (Sandbox Code Playgroud) 以下是在30秒的窗口大小和10秒的幻灯片大小上获得单词计数的简单代码.
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.api.java.function._
import org.apache.spark.streaming.api._
import org.apache.spark.storage.StorageLevel
val ssc = new StreamingContext(sc, Seconds(5))
// read from text file
val lines0 = ssc.textFileStream("test")
val words0 = lines0.flatMap(_.split(" "))
// read from socket
val lines1 = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val words1 = lines1.flatMap(_.split(" "))
val words = words0.union(words1)
val wordCounts = words.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
wordCounts.print()
ssc.checkpoint(".")
ssc.start()
ssc.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
但是,我从这一行得到错误:
val wordCounts = words.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
Run Code Online (Sandbox Code Playgroud)
.特别是来自_ + _.错误是 …
有人说火花流,即使它可以处理微批量形式的流,它仍然不是像暴风雨那样的流媒体计算系统.那么这种微批量计算思想的限制因素是什么?是什么让它不如真正的计算系统?谢谢!
set i 0
set student$i tom
Run Code Online (Sandbox Code Playgroud)
(当然,这相当于set student0 tom)
我想得到的值student0,即字符串"tom".如果我必须在这里$i代表,我怎么能得到它0?我试过$(student$i)或$"student$i"许多其他方式,但我不能得到字符串"汤姆".(我不想硬编码$student0)在这里.
有什么方法可以解决这个问题吗?谢谢!