在Shuffle中Spark节点如何通信?

Cha*_*nce 6 apache-spark

我从这个问题中看到Spark节点有效地"直接通信",但我不太关心理论,而是更多关注实现.这里显示,在页面底部附近的"### Encryption"部分中,您可以将Spark配置为使用多个SSL协议来保证安全性,这至少对我来说,它建议使用某种形式的用于通信的HTTP.我的问题实际上有两个部分:Spark节点使用什么协议进行通信,以及为此传输格式化的数据如何?

Yuv*_*kov 7

Spark使用RPC(Netty)在执行程序进程之间进行通信.您可以查看NettyRpcEndpointRef该类以查看实际的实现.

对于混洗数据,我们从BlockManager负责提供数据块的开始.每个执行程序进程都有一个.在内部,BlockStoreShuffleReader它使用a来管理来自不同执行程序的读取SerializerManager.该管理器包含一个实际的序列化程序,该序列化程序由spark.serializer属性定义:

val serializer = instantiateClassFromConf[Serializer](
  "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")
Run Code Online (Sandbox Code Playgroud)

BlockManager试图读取块时,它使用从底层配置串行器.它可以是a KryoSerializer或a JavaSerializer,具体取决于您的设置.

底线,用于读取和编写混洗数据Spark使用用户定义的序列化器.


对于任务序列化,这有点不同.

Spark使用一个名为的变量closureSerializer,默认JavaSerializerInstance为Java序列化.你可以在DAGScheduler.submitMissingTasks方法中看到这个:

val taskBinaryBytes: Array[Byte] = stage match {
  case stage: ShuffleMapStage =>
    JavaUtils.bufferToArray(
      closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
  case stage: ResultStage =>
      JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
Run Code Online (Sandbox Code Playgroud)

将序列化并发送到每个执行程序的实际对象称为TaskDescription:

def encode(taskDescription: TaskDescription): ByteBuffer = {
  val bytesOut = new ByteBufferOutputStream(4096)
  val dataOut = new DataOutputStream(bytesOut)

  dataOut.writeLong(taskDescription.taskId)
  dataOut.writeInt(taskDescription.attemptNumber)
  dataOut.writeUTF(taskDescription.executorId)
  dataOut.writeUTF(taskDescription.name)
  dataOut.writeInt(taskDescription.index)

  // Write files.
  serializeStringLongMap(taskDescription.addedFiles, dataOut)

  // Write jars.
  serializeStringLongMap(taskDescription.addedJars, dataOut)

  // Write properties.
  dataOut.writeInt(taskDescription.properties.size())
  taskDescription.properties.asScala.foreach { case (key, value) =>
    dataOut.writeUTF(key)
    // SPARK-19796 -- writeUTF doesn't work for long strings, which can happen for property values
    val bytes = value.getBytes(StandardCharsets.UTF_8)
    dataOut.writeInt(bytes.length)
    dataOut.write(bytes)
  }

  // Write the task. The task is already serialized, so write it directly to the byte buffer.
  Utils.writeByteBuffer(taskDescription.serializedTask, bytesOut)

  dataOut.close()
  bytesOut.close()
  bytesOut.toByteBuffer
}
Run Code Online (Sandbox Code Playgroud)

并从CoarseGrainedSchedulerBackend.launchTasks方法中通过RPC发送:

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
Run Code Online (Sandbox Code Playgroud)

到目前为止我所展示的内容都是关于启动任务的.为了改组数据,Spark拥有一个BlockStoreShuffleReader管理来自不同执行程序的读取.