s11*_*230 8 apache-spark spark-structured-streaming
我有一个从S3读取的Spark Structured Streaming Job,转换数据然后将其存储到一个S3接收器和一个Elasticsearch接收器.
目前,我做了readStream一次,然后 writeStream.format("").start()两次.这样做似乎Spark从S3源读取数据两次,每个接收器一次.
是否有更有效的方法写入同一管道中的多个接收器?
目前,我正在做readStream一次,然后两次writeStream.format("").start().
您实际创建了两个单独的流式查询.所述load双组分是描述第一(只)数据流源.这没有执行任何操作.
这样做似乎Spark从每个接收器的S3源读取数据两次.
这是描述Spark Structured Streaming查询如何工作的最正确方法.接收器的数量对应于查询的数量,因为一个流式查询可以只有一个流式接收器(请参阅位于任何流式查询后面的StreamExecution).
您还可以检查线程数(使用jconsole或类似),因为Structured Streaming microBatchThread每个流查询使用一个线程(请参阅StreamExecution).
是否有更有效的方法写入同一管道中的多个接收器?
这是不是可以在星火的当前设计结构化数据流.
您要做的是cache()读取一次并多次使用后的数据。我不认为Spark结构化流当前支持缓存(请参阅此处),但是您可以使用Spark流。与结构化流相比(与数据帧/数据集相比,使用底层RDD),它是一个较低级别的API。从Spark Streaming 文档中:
与RDD相似,DStreams还允许开发人员将流的数据持久存储在内存中。也就是说,在DStream上使用persist()方法将自动将该DStream的每个RDD持久保存在内存中。如果DStream中的数据将被多次计算(例如,对同一数据进行多次操作),这将很有用。
使用Spark Streaming API,您可以Dstream.cache()在数据上使用。这会将基础的RDD标记为已缓存,这应防止再次读取。超时后,Spark Streaming将自动取消保留RDD,您可以使用该spark.cleaner.ttl设置来控制行为。请注意,默认值为无穷大,我不建议在生产设置中使用。
除了使用Dstream.cache()需要等待spark.cleaner.ttl超时的位置之外,还有另一种缓存数据的方法。可以用来foreachRDD直接访问基础的RDD。使用后,可以直接将RDD取消缓存。
dstream.foreachRDD{rdd =>
rdd.cache()
// perform any transormations, etc.
rdd.saveAs(...)
rdd.unpersist(true)
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2654 次 |
| 最近记录: |