带有额外参数的映射分区 pyspark

Har*_*ish 5 python pyspark

我想从 mappartition 向 python 函数传递一些额外的参数。有什么建议..

我的示例代码如下所示

 def test(x,abc):
   <<code>>

 abc =1234
 df = df.repartition("key")
 res= df.rdd.mapPartitions(test, abc)
Run Code Online (Sandbox Code Playgroud)

如果我传递 abc 作为参数并在测试函数中使用它,我会收到以下错误

例外:您似乎正在尝试广播 RDD 或从操作或转换引用 RDD。RDD 转换和操作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(lambda x: rdd2.values.count() * x) 无效,因为值转换和计数操作无法在 rdd1.map 转换内部执行。有关更多信息,请参阅 SPARK-5063。

Mariusz 请找到零钱

from pyspark.sql import Row
def test(abc):
    def my_map_partitions(x):
       print("----------start-----------")
       cnt=1
       ret = []
       for i in x:
         cnt=cnt+1
         val = Row(key1=i.key1, key2=i.key2, cnt=cnt)
         ret.append(val)
       return ret 
    return my_map_partitions
df = df.repartition("key1key2").sortWithinPartitions("key1key2")  
abc123 = df .rdd.mapPartitions(test(abc)) 
Run Code Online (Sandbox Code Playgroud)

Mar*_*usz 0

尝试创建返回函数的函数,例如:

def test(abc):
    def my_map_partitions(partition):
        ...do something with partition and abc...
    return my_map_partitions

df.rdd.mapPartitions(test(abc))
Run Code Online (Sandbox Code Playgroud)