Apache Spark如何将功能发送到引擎盖下的其他机器

use*_*279 6 python java scala apache-spark pyspark

我开始玩Pyspark来做一些数据处理.我很有意思,我可以做类似的事情

rdd.map(lambda x : (x['somekey'], 1)).reduceByKey(lambda x,y: x+y).count()
Run Code Online (Sandbox Code Playgroud)

并且它会将这些函数中的逻辑发送到可能众多的机器上以并行执行.

现在,来自Java背景,如果我想将包含某些方法的对象发送到另一台机器,那么该机器需要知道通过网络流式传输的对象的类定义.最近java有了Functional Interfaces的想法,它会在编译时为我创建该接口的实现(即MyInterface impl =() - > System.out.println("Stuff");)

MyInterface只有一个方法,'doStuff()'

但是,如果我想通过线路发送这样的函数,目标机器需要知道实现(impl本身)才能调用它的'doStuff()'方法.

我的问题归结为......用Scala编写的Spark如何实际向其他机器发送功能?我有几个人:

  1. 驱动程序将类定义传递给其他机器,这些机器使用类加载器动态加载它们.然后驱动程序对对象进行流式处理,并且机器知道它们是什么,并且可以对它们执行.
  2. Spark在所有机器(核心库)上定义了一组方法,这些方法都是我可以通过的所有机器所需要的.也就是说,我传递的函数被转换为核心库上的一个或多个函数调用.(似乎不太可能,因为lambda几乎可以做任何事情,包括在其中实例化其他对象)

谢谢!

编辑:Spark是用Scala编写的,但我有兴趣听听如何在Java中解决这个问题(除非它在一个类中,否则函数不能存在,从而改变需要在工作节点上更新的类定义).

编辑2:这是java中的问题,如果出现混淆:

public class Playground
{
    private static interface DoesThings
    {
        public void doThing();
    }
    public void func() throws Exception {
        Socket s = new Socket("addr", 1234);
        ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
        oos.writeObject("Hello!"); // Works just fine, you're just sending a string
        oos.writeObject((DoesThings)()->System.out.println("Hey, im doing a thing!!")); // Sends the object, but error on other machine

        DoesThings dt = (DoesThings)()->System.out.println("Hey, im doing a thing!!");
        System.out.println(dt.getClass());
    }
}
Run Code Online (Sandbox Code Playgroud)

System.out,println(dt.getClass())返回:"class JohnLibs.Playground $$ Lambda $ 1/23237446"

现在,假设接口定义不在同一个文件中,它在两台机器的共享文件中.但是这个驱动程序func()实际上创建了一个实现DoesThings的新类.正如您所看到的,目标计算机不会知道JohnLibs.Playground $$ Lambda $ 1/23237446是什么,即使它知道DoesThings是什么.这一切都归结为你不能通过一个函数,而不受它的约束.在python中,您可以发送带有定义的String,然后执行该字符串(自解释后).也许这就是火花的作用,因为它使用scala而不是java(如果scala可以在类之外使用函数)

Tob*_*itt 5

当然,Java 字节码是 Java 和 Scala 编译后的目标,它是专门为独立于平台而创建的。因此,如果您有一个类文件,您可以将它移动到任何其他机器上,无论“硅”架构如何,并且只要它具有至少该版本的 JVM,它就会运行。James Gosling 和他的团队故意这样做是为了让代码从一开始就在机器之间移动,并且在 Java 0.98(我玩过的第一个版本)中很容易演示。

当 JVM 尝试加载一个类时,它使用 ClassLoader 的一个实例。类加载器包含两件事,获取字节码文件的二进制文件的能力,以及加载代码的能力(验证其完整性,将其转换为 java.lang.Class 的内存实例,并使其可用于其他代码系统中)。在 Java 1 中,如果您想控制如何加载字节,您通常必须编写自己的类加载器,尽管有一个特定于 sun 的 AppletClassLoader,它是为从 http 而不是从文件系统加载类文件而编写的。

稍后,在 Java 1.2 中,“如何获取类文件的字节”部分在 URLClassloader 中分离出来。这可以使用任何支持的协议来加载类。事实上,协议支持机制过去和现在都可以通过可插拔协议处理程序进行扩展。所以,现在您可以从任何地方加载类,而不会在更难的部分出错,这就是您验证和安装类的方式。

除此之外,Java 的 RMI 机制允许将序列化对象(类名以及对象的“状态”部分)包装在 MarshaledObject 中。这添加了“可以从何处加载此类”,表示为 URL。RMI 自动将内存中的真实对象转换为 MarshaledObjects,并在网络上传送它们。如果 JVM 接收到它已经具有类定义的封送对象,它总是使用该类定义(为了安全)。但是,如果没有,那么如果满足一堆标准(安全性,并且只是正常工作,标准),则可以从该远程服务器加载类文件,从而允许 JVM 加载它从未见过定义的类。(明显地,

现在,我不知道(事实上,我发现你的问题试图确定 Spark 是否使用 RMI 基础设施(我确实知道 hadoop 没有,因为,似乎是因为作者想要创建自己的系统——这是当然是有趣和有教育意义的——而不是使用灵活的、可配置的、经过广泛测试的,包括安全测试的系统!)

但是,要使这项工作大体上进行,所要做的就是我为 RMI 概述的步骤,这些要求本质上是:

1) 对象可以被序列化成某种所有参与者都能理解的字节序列格式

2) 当对象通过线路发送时,接收端必须有某种方式来获取定义它们的类文件。这可以是 a) 预安装,b) RMI 的“在这里找到它”的方法或 c) 发送系统发送 jar。这些都可以工作

3) 可能应该维护安全性。在 RMI 中,这个要求相当“在你面前”,但我在 Spark 中没有看到它,所以他们要么隐藏了配置,要么只是修复了它可以做什么。

无论如何,这不是一个真正的答案,因为我用一个具体的例子描述了原则,但不是你问题的实际具体答案。我还是想找!


puh*_*len 2

当您向集群提交 Spark 应用程序时,您的代码将部署到所有工作节点,因此您的类和函数定义存在于所有节点上。