use*_*681 12 postgresql apache-spark spark-streaming apache-spark-sql
我有一个来自kafka的火花流上下文读取事件数据,间隔为10秒.我想用postgres表中的现有数据来补充这个事件数据.
我可以用以下内容加载postgres表:
val sqlContext = new SQLContext(sc)
val data = sqlContext.load("jdbc", Map(
"url" -> url,
"dbtable" -> query))
Run Code Online (Sandbox Code Playgroud)
...
val broadcasted = sc.broadcast(data.collect())
Run Code Online (Sandbox Code Playgroud)
后来我可以像这样穿过它:
val db = sc.parallelize(data.value)
val dataset = stream_data.transform{ rdd => rdd.leftOuterJoin(db)}
Run Code Online (Sandbox Code Playgroud)
我希望保持当前的数据流运行,并且每隔6小时仍然重新加载此表.由于此时apache spark不支持多个运行上下文,我该如何实现呢?有没有解决方法?或者每次我想重新加载数据时是否需要重新启动服务器?这似乎是一个简单的用例......:/
以我的拙见,在 DStreams 上的转换期间重新加载另一个数据源在设计上是不建议的。
与传统的stateful流处理模型相比,D-Streams 旨在将流计算构建为一系列小时间间隔的批量 stateless计算。deterministic
DStream 上的转换是确定性的,这种设计可以通过重新计算从故障中快速恢复。刷新会给恢复/重新计算带来副作用。
一种解决方法是将查询推迟到输出操作,例如:foreachRDD(func)。