Srd*_*vic 1 apache-spark spark-streaming kafka-consumer-api
我有一个Spark流传输过程,该过程将从kafka读取数据到DStream中。
在我的管道中,我做了两次(一个接一个):
DStream.foreachRDD(在RDD上转换并插入到目标中)。
(每次我进行不同的处理并将数据插入到不同的目的地)。
当我从Kafka读取数据后,我想知道DStream.cache是如何工作的吗?有可能做到吗?
现在该过程实际上是从Kafka读取数据两次吗?
请记住,不可能将两个foreachRDD放在一个中(因为两个路径完全不同,所以那里有状态转换-需要在DStream上应用...)
谢谢你的帮助
有两种选择:
用于Dstream.cache()将基础RDD标记为已缓存。由spark.cleaner.ttl配置控制,Spark Streaming将在超时后不保留RDD 。
对DStream中的RDD 使用附加foreachRDD的应用cache()和unpersist(false)副作用操作:
例如:
val kafkaDStream = ???
val targetRDD = kafkaRDD
.transformation(...)
.transformation(...)
...
// Right before the lineage fork mark the RDD as cacheable:
targetRDD.foreachRDD{rdd => rdd.cache(...)}
targetRDD.foreachRDD{do stuff 1}
targetRDD.foreachRDD{do stuff 2}
targetRDD.foreachRDD{rdd => rdd.unpersist(false)}
Run Code Online (Sandbox Code Playgroud)
请注意,如果可以do stuff 1选择的话,可以将缓存作为第一条语句。
我更喜欢此选项,因为它可以让我对缓存的生命周期进行细粒度的控制,并且可以让我在需要时立即清除内容,而不必依赖于ttl。
| 归档时间: |
|
| 查看次数: |
3426 次 |
| 最近记录: |