PySpark文档描述了两个函数:
Run Code Online (Sandbox Code Playgroud)mapPartitions(f, preservesPartitioning=False) Return a new RDD by applying a function to each partition of this RDD. >>> rdd = sc.parallelize([1, 2, 3, 4], 2) >>> def f(iterator): yield sum(iterator) >>> rdd.mapPartitions(f).collect() [3, 7]
而......
Run Code Online (Sandbox Code Playgroud)mapPartitionsWithIndex(f, preservesPartitioning=False) Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition. >>> rdd = sc.parallelize([1, 2, 3, 4], 4) >>> def f(splitIndex, iterator): yield splitIndex >>> rdd.mapPartitionsWithIndex(f).sum() 6
这些功能试图解决哪些用例?我不明白他们为什么会被要求.
我需要从一个大的 pyspark 数据框中收集分区/批次,以便我可以迭代地将它们输入到神经网络中
我的想法是 1) 对数据进行分区,2) 迭代地收集每个分区,3) 将收集的分区转换为 toPandas()
我有点困惑着类似的方法foreachPartition,并mapPartitions因为我不能迭代他们。任何的想法?