在Spark Streaming中缓存DStream

Srd*_*vic 1 apache-spark spark-streaming kafka-consumer-api

我有一个Spark流传输过程,该过程将从kafka读取数据到DStream中。

在我的管道中,我做了两次(一个接一个):

DStream.foreachRDD(在RDD上转换并插入到目标中)。

(每次我进行不同的处理并将数据插入到不同的目的地)。

当我从Kafka读取数据后,我想知道DStream.cache是​​如何工作的吗?有可能做到吗?

现在该过程实际上是从Kafka读取数据两次吗?

请记住,不可能将两个foreachRDD放在一个中(因为两个路径完全不同,所以那里有状态转换-需要在DStream上应用...)

谢谢你的帮助

maa*_*asg 5

有两种选择:

  • 用于Dstream.cache()将基础RDD标记为已缓存。由spark.cleaner.ttl配置控制,Spark Streaming将在超时后不保留RDD 。

  • 对DStream中的RDD 使用附加foreachRDD的应用cache()unpersist(false)副作用操作:

例如:

val kafkaDStream = ???
val targetRDD = kafkaRDD
                       .transformation(...)
                       .transformation(...)
                       ...
// Right before the lineage fork mark the RDD as cacheable:
targetRDD.foreachRDD{rdd => rdd.cache(...)}
targetRDD.foreachRDD{do stuff 1}
targetRDD.foreachRDD{do stuff 2}
targetRDD.foreachRDD{rdd => rdd.unpersist(false)}
Run Code Online (Sandbox Code Playgroud)

请注意,如果可以do stuff 1选择的话,可以将缓存作为第一条语句。

我更喜欢此选项,因为它可以让我对缓存的生命周期进行细粒度的控制,并且可以让我在需要时立即清除内容,而不必依赖于ttl。

  • ```spark.cleaner.ttl``` 被删除。这是什么新属性控制? (2认同)