小编use*_*453的帖子

如何在Apache Spark(pyspark)中使用自定义类?

我在python中编写了一个实现分类器的类.我想使用Apache Spark来使用此分类器并行化大量数据点的分类.

  1. 我在一个拥有10个奴隶的集群上使用Amazon EC2进行设置,基于python的Anaconda发行版附带的ami.ami让我可以远程使用IPython Notebook.
  2. 我已经在文件调用BoTree.py中的文件调用BoTree中定义了文件名/root/anaconda/lib/python2.7/中的主文件,这是我所有的python模块都是
  3. 我已经检查过我可以在从主服务器运行命令行spark时导入和使用BoTree.py(我只需要从编写导入BoTree开始,我的类BoTree就可以了
  4. 我使用spark的/root/spark-ec2/copy-dir.sh脚本在我的集群中复制/python2.7/目录.
  5. 我已经深入了解其中一个奴隶并尝试在那里运行ipython,并且能够导入BoTree,所以我认为该模块已成功发送到集群中(我还可以看到...中的BoTree.py文件) ./python2.7/文件夹)
  6. 在主人身上我已经检查过我可以使用cPickle来挑选和解开一个BoTree实例,我理解这是pyspark的序列化器.

但是,当我执行以下操作时:

import BoTree
bo_tree = BoTree.train(data)
rdd = sc.parallelize(keyed_training_points) #create rdd of 10 (integer, (float, float) tuples
rdd = rdd.mapValues(lambda point, bt = bo_tree: bt.classify(point[0], point[1]))
out = rdd.collect()
Run Code Online (Sandbox Code Playgroud)

Spark失败并出现错误(我认为只是相关的一点):

  File "/root/spark/python/pyspark/worker.py", line 90, in main
    command = pickleSer.loads(command.value)
  File "/root/spark/python/pyspark/serializers.py", line 405, in loads
    return cPickle.loads(obj)
ImportError: No module named BoroughTree
Run Code Online (Sandbox Code Playgroud)

谁能帮我?有点绝望......

谢谢

python python-module apache-spark pyspark

18
推荐指数
2
解决办法
9368
查看次数

具有交互式x轴范围的ggvis折线图

我希望能够

  1. 在ggvis中绘制折线图
  2. 添加两个交互式控件,允许我在图表上设置最小和最大x值

这听起来很简单 - 我的代码是:

minx = minx = input_numeric(1, 'Min x-val')
maxx = input_numeric(1, 'Max x-val')

data.frame(train.dt) %>% 
ggvis(x = ~plot_idx, y = ~val) %>%
layer_lines() %>% add_axis('x') %>%
scale_numeric('x', domain = c(minx, maxx), clamp = T)
Run Code Online (Sandbox Code Playgroud)

但是,这不起作用.我收到此错误消息:

"r [i1]中的错误 - r [-length(r):-(length(r) - lag + 1L)]:二元运算符的非数字参数".

如果我用例如1和10替换domain参数中的minx和maxx,我的图表就可以很好地绘制(但是是静态的).有任何想法吗?

谢谢!

r data-visualization ggplot2 ggvis

5
推荐指数
1
解决办法
1442
查看次数

为什么当RDD包含用户定义的类时,Apache PySpark top()会失败?

我正在使用Apache Spark的PySpark在本地机器上通过iPython Notebook对一些代码进行原型设计.我写了一些似乎工作得很好的代码,但是当我对它进行简单的更改时,它会中断.

下面的第一个代码块有效.第二个块因给定错误而失败.真的很感激任何帮助.我怀疑这个错误与序列化Python对象有关.错误说它不能Pickle TestClass.我无法找到有关如何使我的班级发泡的信息.文档说"通常你可以挑选任何对象,如果你可以挑选该对象的每个属性.类,函数和方法不能被腌制 - 如果你挑选一个对象,对象的类不被pickle,只是一个字符串,标识什么这对于大多数泡菜来说都很好(但请注意关于长期存放泡菜的讨论)." 我不明白这一点,因为我已经尝试用datetime类替换我的TestClass,事情似乎工作正常.

无论如何,代码:

# ----------- This code works -----------------------------
class TestClass(object):
    def __init__(self):
        self.teststr = 'Hello'
    def __str__(self):
        return self.teststr
    def __repr__(self):
        return self.teststr
    def test(self):
        return 'test: {0}'.format(self.teststr)

#load multiple text files into list of RDDs, concatenate them, then remove headers
trip_rdd  = trip_rdds[0]
for rdd in trip_rdds[1:]:
    trip_rdd = trip_rdd.union(rdd)

#filter out header rows from result
trip_rdd = trip_rdd.filter(lambda r: r != header)

#split the line, then convert each element to a …
Run Code Online (Sandbox Code Playgroud)

python serialization pickle apache-spark pyspark

5
推荐指数
1
解决办法
1328
查看次数