如何在Apache Spark(pyspark)中使用自定义类?

use*_*453 18 python python-module apache-spark pyspark

我在python中编写了一个实现分类器的类.我想使用Apache Spark来使用此分类器并行化大量数据点的分类.

  1. 我在一个拥有10个奴隶的集群上使用Amazon EC2进行设置,基于python的Anaconda发行版附带的ami.ami让我可以远程使用IPython Notebook.
  2. 我已经在文件调用BoTree.py中的文件调用BoTree中定义了文件名/root/anaconda/lib/python2.7/中的主文件,这是我所有的python模块都是
  3. 我已经检查过我可以在从主服务器运行命令行spark时导入和使用BoTree.py(我只需要从编写导入BoTree开始,我的类BoTree就可以了
  4. 我使用spark的/root/spark-ec2/copy-dir.sh脚本在我的集群中复制/python2.7/目录.
  5. 我已经深入了解其中一个奴隶并尝试在那里运行ipython,并且能够导入BoTree,所以我认为该模块已成功发送到集群中(我还可以看到...中的BoTree.py文件) ./python2.7/文件夹)
  6. 在主人身上我已经检查过我可以使用cPickle来挑选和解开一个BoTree实例,我理解这是pyspark的序列化器.

但是,当我执行以下操作时:

import BoTree
bo_tree = BoTree.train(data)
rdd = sc.parallelize(keyed_training_points) #create rdd of 10 (integer, (float, float) tuples
rdd = rdd.mapValues(lambda point, bt = bo_tree: bt.classify(point[0], point[1]))
out = rdd.collect()
Run Code Online (Sandbox Code Playgroud)

Spark失败并出现错误(我认为只是相关的一点):

  File "/root/spark/python/pyspark/worker.py", line 90, in main
    command = pickleSer.loads(command.value)
  File "/root/spark/python/pyspark/serializers.py", line 405, in loads
    return cPickle.loads(obj)
ImportError: No module named BoroughTree
Run Code Online (Sandbox Code Playgroud)

谁能帮我?有点绝望......

谢谢

zer*_*323 15

可能最简单的解决方案是pyFiles在创建时使用参数SparkContext

from pyspark import SparkContext
sc = SparkContext(master, app_name, pyFiles=['/path/to/BoTree.py'])
Run Code Online (Sandbox Code Playgroud)

放在那里的每个文件都会发送给工人并添加到PYTHONPATH.

如果您在交互模式下工作,则必须sc.stop()在创建新上下文之前停止使用现有上下文.

还要确保Spark worker实际上使用的是Anaconda发行版,而不是默认的Python解释器.根据您的描述,这很可能是问题所在.设置PYSPARK_PYTHON您可以使用conf/spark-env.sh文件.

在侧面说明复制文件lib是一个相当混乱的解决方案.如果你想避免使用文件,pyFiles我建议你创建普通的Python包或Conda包以及正确的安装.通过这种方式,您可以轻松跟踪已安装的内容,删除不必要的软件包并避免一些难以调试的问题.


dmb*_*ker 11

获取SparkContext后,也可以使用addPyFile随后将模块发送给每个工作者.

sc.addPyFile('/path/to/BoTree.py')
Run Code Online (Sandbox Code Playgroud)

pyspark.SparkContext.addPyFile(path)文档