批处理之间的Spark流数据共享

Vij*_*uri 7 apache-spark spark-streaming

Spark流以微批处理数据.

使用RDD并行处理每个间隔数据,而不在每个间隔之间共享任何数据.

但我的用例需要在间隔之间共享数据.

考虑网络WordCount示例,该示例生成在该间隔中接收的所有单词的计数.

我如何生成以下字数?

  • 具有先前间隔计数的单词"hadoop"和"spark"的相对计数

  • 所有其他单词的正常字数.

注意:UpdateStateByKey执行有状态处理,但这会在每条记录上应用函数而不是特定记录.

因此,UpdateStateByKey不适合此要求.

更新:

考虑以下示例

间隔1

输入:

Sample Input with Hadoop and Spark on Hadoop
Run Code Online (Sandbox Code Playgroud)

输出:

hadoop  2
sample  1
input   1
with    1
and 1
spark   1
on  1
Run Code Online (Sandbox Code Playgroud)

间隔2

输入:

Another Sample Input with Hadoop and Spark on Hadoop and another hadoop another spark spark
Run Code Online (Sandbox Code Playgroud)

输出:

another 3
hadoop  1
spark   2
and 2
sample  1
input   1
with    1
on  1
Run Code Online (Sandbox Code Playgroud)

说明:

第一个间隔给出所有单词的正常单词计数.

在第二个间隔中,hadoop发生了3次,但输出应为1(3-2)

火花发生3次,但输出应为2(3-1)

对于所有其他单词,它应该给出正常的单词计数.

因此,在处理第二个Interval数据时,它应该具有hadoopspark的第一个区间的字数

这是一个简单的例子.

在实际使用案例中,需要数据共享的字段是RDD元素(RDD)的一部分,并且需要跟踪大量的值.

即,在这个例子中,像hadoop和spark关键字要跟踪近100k个关键字.

Apache Storm中类似的用例:

风暴中的分布式缓存

风暴交易词

maa*_*asg 11

这可以通过"记住"最后收到的RDD并使用左连接将该数据与下一个流批处理合并来实现.我们使用它streamingContext.remember来使流式处理生成的RDD保持在我们需要的时间.

我们利用了dstream.transform一个在驱动程序上执行的操作,因此我们可以访问所有本地对象定义.特别是我们希望在每个批次上更新具有所需值的最后一个RDD的可变引用.

可能一段代码使这个想法更清晰:

// configure the streaming context to remember the RDDs produced
// choose at least 2x the time of the streaming interval
ssc.remember(xx Seconds)  

// Initialize the "currentData" with an empty RDD of the expected type
var currentData: RDD[(String, Int)] = sparkContext.emptyRDD

// classic word count
val w1dstream = dstream.map(elem => (elem,1))    
val count = w1dstream.reduceByKey(_ + _)    

// Here's the key to make this work. Look how we update the value of the last RDD after using it. 
val diffCount = count.transform{ rdd => 
                val interestingKeys = Set("hadoop", "spark")               
                val interesting = rdd.filter{case (k,v) => interestingKeys(k)}                                
                val countDiff = rdd.leftOuterJoin(currentData).map{case (k,(v1,v2)) => (k,v1-v2.getOrElse(0))}
                currentData = interesting
                countDiff                
               }

diffCount.print()
Run Code Online (Sandbox Code Playgroud)