Spark Streaming:如何定期刷新缓存的RDD?

lai*_*ech 8 apache-spark spark-streaming

在我的Spark流应用程序中,我想根据从后端(ElasticSearch)检索的字典映射值.我想定期刷新字典,以防它在后端更新.它类似于Logstash转换过滤器的定期刷新功能.我怎么能用Spark实现这个目标(例如,每隔30秒以某种方式解决RDD问题)?

maa*_*asg 10

我发现这样做的最好方法是重新创建RDD并保持对它的可变引用.Spark Streaming的核心是Spark之上的调度框架.我们可以在调度程序上捎带以定期刷新RDD.为此,我们使用一个空的DStream,我们只为刷新操作安排:

def getData():RDD[Data] = ??? function to create the RDD we want to use af reference data
val dstream = ??? // our data stream

// a dstream of empty data
val refreshDstream = new  ConstantInputDStream(ssc, sparkContext.parallelize(Seq())).window(Seconds(refreshInterval),Seconds(refreshInterval))

var referenceData = getData()
referenceData.cache()
refreshDstream.foreachRDD{_ => 
    // evict the old RDD from memory and recreate it
    referenceData.unpersist(true)
    referenceData = getData()
    referenceData.cache()
}

val myBusinessData = dstream.transform(rdd => rdd.join(referenceData))
... etc ...
Run Code Online (Sandbox Code Playgroud)

在过去,我也只尝试过交错cache()unpersist()没有结果(只刷新一次).重新创建RDD会删除所有沿袭并提供新数据的干净加载.