pyspark mapPartitions函数如何工作?

Met*_*est 20 python scala bigdata apache-spark

所以我试图用Python(Pyspark)学习Spark.我想知道这个功能是如何mapPartitions工作的.这就是输入它所带来的输出和输出.我在互联网上找不到任何合适的例子.可以说,我有一个包含列表的RDD对象,如下所示.

[ [1, 2, 3], [3, 2, 4], [5, 2, 7] ] 
Run Code Online (Sandbox Code Playgroud)

我想从所有列表中删除元素2,我将如何使用它mapPartitions.

bea*_*ito 27

mapPartition应该被视为对分区的映射操作,而不是分区的元素.它的输入是当前分区的集合,其输出将是另一组分区.

您传递的函数映射必须采用RDD的单个元素

传递mapPartition的函数必须采用RDD类型的迭代,并返回和迭代某些其他或相同类型.

在你的情况下,你可能只想做类似的事情

def filter_out_2(line):
    return [x for x in line if x != 2]

filtered_lists = data.map(filterOut2)
Run Code Online (Sandbox Code Playgroud)

如果你想使用mapPartition,那就是

def filter_out_2_from_partition(list_of_lists):
  final_iterator = []
  for sub_list in list_of_lists:
    final_iterator.append( [x for x in sub_list if x != 2])
  return iter(final_iterator)

filtered_lists = data.mapPartition(filterOut2FromPartion)
Run Code Online (Sandbox Code Playgroud)


Nar*_*rek 24

使用yield语法将mapPartitions与生成器函数一起使用会更容易:

def filter_out_2(partition):
    for element in partition:
        if element != 2:
            yield element

filtered_lists = data.mapPartitions(filter_out_2)
Run Code Online (Sandbox Code Playgroud)

  • @cgreen 该分区包含您的所有数据。我不确定您是否要将所有数据加载到列表中。当您迭代数据时,生成器比列表更受青睐。 (3认同)