man*_*ndy 5 python apache-spark rdd pyspark
是否可以在Python中扩展Spark的RDD以添加自定义运算符?如果不可能,如何为扩展RDD的类包装Scala代码,例如这里的那个:http: //blog.madhukaraphatak.com/extending-spark-api/
编辑:我正在尝试创建一个新的RDD,比如PersonRDD并在PersonRDD上添加一组新的运算符,例如.PersonRDD.computeMedianIncome().根据下面的链接,在Python中执行此操作并非易事.但是,由于它是一个旧线程,我想知道是否有任何新的更新.如果没有,我想使用Scala来做,但我不知道如何使用Py4J从Python调用该类(mail-archives.us.apache.org/mod_mbox/spark-user/201308.mbox/...)
任何建议或帮助将不胜感激.
曼迪
在分布式环境中计算精确的中位数需要一些努力,所以假设您想要对 RDD 中的所有值进行平方之类的东西。让我们调用这个方法squares并假设它应该按如下方式工作:
assert rdd.squares().collect() == rdd.map(lambda x: x * x).collect()
Run Code Online (Sandbox Code Playgroud)
pyspark.RDD定义:from pyspark import RDD
def squares(self):
return self.map(lambda x: x * x)
RDD.squares = squares
rdd = sc.parallelize([1, 2, 3])
assert rdd.squares().collect() == [1, 4, 9]
Run Code Online (Sandbox Code Playgroud)
注意:如果您修改类定义,每个实例都将可以访问squares.
class RDDWithSquares(RDD):
def squares(self):
return self.map(lambda x: x * x)
rdd = sc.parallelize([1, 2, 3])
rdd.__class__ = RDDWithSquares # WARNING: see a comment below
Run Code Online (Sandbox Code Playgroud)
分配一个类是一种肮脏的黑客行为,所以在实践中你应该以正确的方式创建一个 RDD(例如,参见context.parallelize实现)。
import types
rdd = sc.parallelize([1, 2, 3])
# Reusing squares function defined above
rdd.squares = types.MethodType(squares, rdd)
Run Code Online (Sandbox Code Playgroud)
首先,我还没有对其中任何一个进行足够长的测试,以确保其中没有隐藏的问题。
而且我认为这真的不值得大惊小怪。如果没有静态类型检查,真的很难找到任何好处,并且您可以使用函数、柯里化以及pipes以更简洁的方式获得类似的结果。
from toolz import pipe
pipe(
sc.parallelize([1, 2, 3]),
squares,
lambda rdd: rdd.collect())
Run Code Online (Sandbox Code Playgroud)