Met*_*est 20 python scala bigdata apache-spark
所以我试图用Python(Pyspark)学习Spark.我想知道这个功能是如何mapPartitions工作的.这就是输入它所带来的输出和输出.我在互联网上找不到任何合适的例子.可以说,我有一个包含列表的RDD对象,如下所示.
[ [1, 2, 3], [3, 2, 4], [5, 2, 7] ]
Run Code Online (Sandbox Code Playgroud)
我想从所有列表中删除元素2,我将如何使用它mapPartitions.
bea*_*ito 27
mapPartition应该被视为对分区的映射操作,而不是分区的元素.它的输入是当前分区的集合,其输出将是另一组分区.
您传递的函数映射必须采用RDD的单个元素
传递mapPartition的函数必须采用RDD类型的迭代,并返回和迭代某些其他或相同类型.
在你的情况下,你可能只想做类似的事情
def filter_out_2(line):
return [x for x in line if x != 2]
filtered_lists = data.map(filterOut2)
Run Code Online (Sandbox Code Playgroud)
如果你想使用mapPartition,那就是
def filter_out_2_from_partition(list_of_lists):
final_iterator = []
for sub_list in list_of_lists:
final_iterator.append( [x for x in sub_list if x != 2])
return iter(final_iterator)
filtered_lists = data.mapPartition(filterOut2FromPartion)
Run Code Online (Sandbox Code Playgroud)
Nar*_*rek 24
使用yield语法将mapPartitions与生成器函数一起使用会更容易:
def filter_out_2(partition):
for element in partition:
if element != 2:
yield element
filtered_lists = data.mapPartitions(filter_out_2)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
27961 次 |
| 最近记录: |