在Python中创建自定义Spark RDD

man*_*ndy 5 python apache-spark rdd pyspark

是否可以在Python中扩展Spark的RDD以添加自定义运算符?如果不可能,如何为扩展RDD的类包装Scala代码,例如这里的那个:http: //blog.madhukaraphatak.com/extending-spark-api/

编辑:我正在尝试创建一个新的RDD,比如PersonRDD并在PersonRDD上添加一组新的运算符,例如.PersonRDD.computeMedianIncome().根据下面的链接,在Python中执行此操作并非易事.但是,由于它是一个旧线程,我想知道是否有任何新的更新.如果没有,我想使用Scala来做,但我不知道如何使用Py4J从Python调用该类(mail-archives.us.apache.org/mod_mbox/spark-user/201308.mbox/...)

任何建议或帮助将不胜感激.

曼迪

zer*_*323 4

在分布式环境中计算精确的中位数需要一些努力,所以假设您想要对 RDD 中的所有值进行平方之类的东西。让我们调用这个方法squares并假设它应该按如下方式工作:

assert rdd.squares().collect() == rdd.map(lambda x: x * x).collect()
Run Code Online (Sandbox Code Playgroud)

1.修改pyspark.RDD定义:

from pyspark import RDD

def squares(self):
    return self.map(lambda x: x * x)

RDD.squares = squares
rdd = sc.parallelize([1, 2, 3])
assert rdd.squares().collect() == [1, 4, 9]
Run Code Online (Sandbox Code Playgroud)

注意:如果您修改类定义,每个实例都将可以访问squares.

2.创建RDD子类:

class RDDWithSquares(RDD):
    def squares(self):
        return self.map(lambda x: x * x)

rdd = sc.parallelize([1, 2, 3])
rdd.__class__ = RDDWithSquares # WARNING: see a comment below
Run Code Online (Sandbox Code Playgroud)

分配一个类是一种肮脏的黑客行为,所以在实践中你应该以正确的方式创建一个 RDD(例如,参见context.parallelize实现)。

3. 给实例添加方法

import types

rdd = sc.parallelize([1, 2, 3])
# Reusing squares function defined above
rdd.squares = types.MethodType(squares, rdd)
Run Code Online (Sandbox Code Playgroud)

免责声明

首先,我还没有对其中任何一个进行足够长的测试,以确保其中没有隐藏的问题。

而且我认为这真的不值得大惊小怪。如果没有静态类型检查,真的很难找到任何好处,并且您可以使用函数、柯里化以及pipes以更简洁的方式获得类似的结果。

from toolz import pipe
pipe(
    sc.parallelize([1, 2, 3]),
    squares,
    lambda rdd: rdd.collect())
Run Code Online (Sandbox Code Playgroud)