如何理解apache Spark中的queueStream API?

Kra*_* Li 5 apache-spark

pyspark有一个apiqueueStream,用于从一系列rdd构造dstream。

\n\n
queueStream(rdds, oneAtATime=True, default=None)\nCreate an input stream from an queue of RDDs or list. In each batch, it will process either one or all of the RDDs returned by the queue.\n\nNOTE: changes to the queue after the stream is created will not be recognized.\n\nParameters: \nrdds \xe2\x80\x93 Queue of RDDs\noneAtATime \xe2\x80\x93 pick one rdd each time or pick all of them once.\ndefault \xe2\x80\x93 The default rdd if no more in rdds\n
Run Code Online (Sandbox Code Playgroud)\n\n

问题一:

\n\n

在分布式环境中,如果我定义一个队列对象q1。我执行像 q1.add(RDD) 这样的操作。q1对象会传输到所有工作节点吗?如果将该对象复制到其他节点,q1.add(RDD)操作会出现问题吗?

\n\n

问题2:

\n\n

在我运行 dstream =queueStream(q1) 后。\n如果我继续将RDD放入队列中。这些RDDS会被添加到dstream中吗?

\n

zer*_*323 4

我相信以下注意事项:

创建流后对队列的更改将不会被识别。

几乎回答了这个问题,但要理解为什么会出现这种情况,您必须查看 PySpark 代码,特别是以下行

queue = self._jvm.PythonDStream.toRDDQueue([r._jrdd for r in rdds])
Run Code Online (Sandbox Code Playgroud)

如果这还不够,你可以看一下相应的 Scala 代码,看看它需要一个静态列表:

def toRDDQueue(rdds: JArrayList[JavaRDD[Array[Byte]]])
Run Code Online (Sandbox Code Playgroud)

并将其转换为队列:

val queue = new java.util.LinkedList[JavaRDD[Array[Byte]]]
rdds.asScala.foreach(queue.add)
Run Code Online (Sandbox Code Playgroud)

因此,Python 端的任何更改根本无法反映在流中。

对于第一个问题,答案是否定的。队列不会被分布式,因为 RDD 在外部Driver上下文中根本没有意义。

笔记

需要明确的是,ScalaqueueStream将反映队列中的添加情况。Spark 源代码中甚至还有一个示例