foreachRDD是否在驱动程序上执行?

Nit*_*mar 10 apache-spark spark-streaming

我正在尝试使用Spark Streaming处理在JMS队列(QPID)上接收的一些XML数据.在将xml作为DStream获取之后,我将它们转换为Dataframes,这样我就可以加入一些已经加载的Dataframes形式的静态数据.但是根据DStream上foreachRdd方法的API文档:它在Driver上执行,所以这意味着所有处理逻辑只能在Driver上运行而不会分发给worker/executor.

API文档

foreachRDD(func)

最通用的输出运算符,它将函数func应用于从流生成的每个RDD.此函数应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或通过网络将其写入数据库.请注意,函数func在运行流应用程序的驱动程序进程中执行,并且通常会在其中执行RDD操作,这将强制计算流式RDD.

Yuv*_*kov 12

这是否意味着所有处理逻辑只能在Driver上运行而不会分发给worker/executor.

不,函数本身在驱动程序上运行,但不要忘记它在一个驱动程序上运行RDD.你会在使用的内部功能RDD,如foreachPartition,map,filter等会还是工人节点上运行.这不会导致所有数据通过网络发送回驱动程序,除非您调用类似的方法collect.


小智 5

为了清楚起见,如果运行以下命令,您将在驱动程序的标准输出上看到“ monkey”:

myDStream.foreachRDD { rdd =>
  println("monkey")
}
Run Code Online (Sandbox Code Playgroud)

如果运行以下命令,则会在驱动程序的标准输出上看到“ monkey”,并且过滤工作将在rdd分布于其上的所有执行程序上完成:

myDStream.foreachRDD { rdd =>
  println("monkey")
  rdd.filter(element => element == "Save me!")
}
Run Code Online (Sandbox Code Playgroud)

让我们添加一个简化方案,即它myDStream仅接收一个RDD,并且该RDD分布在运行PartitionSetAMachineSetB何处的一组分区(我们称之为分区)ExecutorSetC上。如果运行以下命令,则会在驱动程序的标准输出上看到“ monkey”,在所有执行程序的标准输出上会看到“ turtle” ExecutorSetC(“ turtle”在每个分区上都会出现一次-许多分区可能在计算机上,一个执行程序正在运行),那么过滤器和加法操作的工作将跨ExecutorSetC

myDStream.foreachRDD { rdd =>
  println("monkey")
  rdd.filter(element => element == "Save me!")
  rdd.foreachPartition { partition =>
    println("turtle")
    val x = 1 + 1
  }
}
Run Code Online (Sandbox Code Playgroud)

还有一点要注意的是,在下面的代码中,y最终将通过驱动程序通过网络发送ExecutorSetC给每个驱动程序rdd

val y = 2
myDStream.foreachRDD { rdd =>
  println("monkey")
  rdd.filter(element => element == "Save me!")
  rdd.foreachPartition { partition =>
    println("turtle")
    val x = 1 + 1
    val z = x + y
  }
}
Run Code Online (Sandbox Code Playgroud)

为了避免这种开销,您可以使用广播变量,该变量仅将一次值从驱动程序发送到执行程序。例如:

val y = 2
val broadcastY = sc.broadcast(y)
myDStream.foreachRDD { rdd =>
  println("monkey")
  rdd.filter(element => element == "Save me!")
  rdd.foreachPartition { partition =>
    println("turtle")
    val x = 1 + 1
    val z = x + broadcastY.value
  }
}
Run Code Online (Sandbox Code Playgroud)

要通过广播变量发送更复杂的内容,例如实例化后不容易序列化的对象,可以查看以下博客文章:https : //allegro.tech/2015/08/spark-kafka-integration.html