Vij*_*uri 7 apache-spark spark-streaming
Spark流以微批处理数据.
使用RDD并行处理每个间隔数据,而不在每个间隔之间共享任何数据.
但我的用例需要在间隔之间共享数据.
考虑网络WordCount示例,该示例生成在该间隔中接收的所有单词的计数.
我如何生成以下字数?
具有先前间隔计数的单词"hadoop"和"spark"的相对计数
所有其他单词的正常字数.
注意:UpdateStateByKey执行有状态处理,但这会在每条记录上应用函数而不是特定记录.
因此,UpdateStateByKey不适合此要求.
更新:
考虑以下示例
间隔1
输入:
Sample Input with Hadoop and Spark on Hadoop
Run Code Online (Sandbox Code Playgroud)
输出:
hadoop 2
sample 1
input 1
with 1
and 1
spark 1
on 1
Run Code Online (Sandbox Code Playgroud)
间隔2
输入:
Another Sample Input with Hadoop and Spark on Hadoop and another hadoop another spark spark
Run Code Online (Sandbox Code Playgroud)
输出:
another 3
hadoop 1
spark 2
and 2
sample 1
input 1
with 1
on 1
Run Code Online (Sandbox Code Playgroud)
说明:
第一个间隔给出所有单词的正常单词计数.
在第二个间隔中,hadoop发生了3次,但输出应为1(3-2)
火花发生3次,但输出应为2(3-1)
对于所有其他单词,它应该给出正常的单词计数.
因此,在处理第二个Interval数据时,它应该具有hadoop和spark的第一个区间的字数
这是一个简单的例子.
在实际使用案例中,需要数据共享的字段是RDD元素(RDD)的一部分,并且需要跟踪大量的值.
即,在这个例子中,像hadoop和spark关键字要跟踪近100k个关键字.
Apache Storm中类似的用例:
maa*_*asg 11
这可以通过"记住"最后收到的RDD并使用左连接将该数据与下一个流批处理合并来实现.我们使用它streamingContext.remember
来使流式处理生成的RDD保持在我们需要的时间.
我们利用了dstream.transform
一个在驱动程序上执行的操作,因此我们可以访问所有本地对象定义.特别是我们希望在每个批次上更新具有所需值的最后一个RDD的可变引用.
可能一段代码使这个想法更清晰:
// configure the streaming context to remember the RDDs produced
// choose at least 2x the time of the streaming interval
ssc.remember(xx Seconds)
// Initialize the "currentData" with an empty RDD of the expected type
var currentData: RDD[(String, Int)] = sparkContext.emptyRDD
// classic word count
val w1dstream = dstream.map(elem => (elem,1))
val count = w1dstream.reduceByKey(_ + _)
// Here's the key to make this work. Look how we update the value of the last RDD after using it.
val diffCount = count.transform{ rdd =>
val interestingKeys = Set("hadoop", "spark")
val interesting = rdd.filter{case (k,v) => interestingKeys(k)}
val countDiff = rdd.leftOuterJoin(currentData).map{case (k,(v1,v2)) => (k,v1-v2.getOrElse(0))}
currentData = interesting
countDiff
}
diffCount.print()
Run Code Online (Sandbox Code Playgroud)