Evo*_*mov 3 apache-spark spark-streaming
将 DStream RDD 与 Batch RDD 结合/联合/cogroup 的唯一方法是通过“transform”方法,该方法返回另一个 DStream RDD,因此它在微批次结束时被丢弃。
有什么方法可以将 Dstream RDD 与 Batch RDD 结合起来,它会产生一个新的 Batch RDD,其中包含 DStream RDD 和 Batch RDD 的元素。
并且一旦以上述方式创建了这样的 Batch RDD,它是否可以被其他 DStream RDD 使用,例如加入,因为这次结果可以是另一个 DStream RDD
有效地,上述功能将导致 Batch RDD 元素的定期更新(添加) - 附加元素将继续来自 DStream RDD,这些 DStream RDD 会随着每个微批次不断流入。此外,新到达的 DStream RDD 将能够加入之前更新的 BATch RDD 并产生结果 DStream RDD
使用 updateStateByKey 几乎可以实现类似的功能,但是有没有一种方法可以按照此处的描述进行操作
另一种方法是将批处理输入转换为 DStream 并将其与您的流输入合并。然后您使用 foreachRDD 将它写出来,这是您对其他作业的新批量输入。
val batch = sc.textFile(...)
val ssc = new StreamingContext(sc, Seconds(30))
val stream = ssc.textFileStream(...)
import scala.collection.mutable
val batchStream = ssc.queueStream(mutable.Queue.empty[RDD[String]], oneAtATime = false, defaultRDD = batch)
val union = ssc.union(Seq(stream, batchStream))
union.print()
union.foreachRDD { rdd =>
// Delete previous, or use SchemaRDD with .insertInto(, overwrite = true)
rdd.saveTextFile(...)
}
ssc.start()
ssc.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1665 次 |
| 最近记录: |