use*_*478 9 distributed scala apache-spark spark-streaming
这是一个用scala编写的火花流程序.它计算每1秒钟来自套接字的字数.结果将是单词count,例如,从0到1的单词计数,以及从1到2的单词计数.但是我想知道是否有某种方法可以改变这个程序以便我们可以累积字数?也就是说,字数从时间0到现在为止.
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
aar*_*man 10
你可以使用一个StateDStream
.火花例子中有一个有状态字数的例子.
object StatefulNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(".")
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the words)
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}
Run Code Online (Sandbox Code Playgroud)
它的工作方式是你Seq[T]
为每个批次获得一个,然后你更新一个Option[T]
像累加器一样的行为.它是一个原因Option
是因为在第一批它将None
保持这种方式,除非它更新.在这个例子中,count是一个int,如果你正在处理大量的数据,你甚至可能想要一个Long
或BigInt
归档时间: |
|
查看次数: |
4369 次 |
最近记录: |