Mar*_*kus 4 python pandas apache-spark pyspark
I'm using pySpark in version 2.3 (cannot update to 2.4 in my current dev-System) and have the following questions concerning the foreachPartition.
First a little context: As far as I understood pySpark-UDFs force the Python-code to be executed outside the Java Virtual Machine (JVM) in a Python-instance, making it performance-costing.
Since I need to apply some Python-functions to my data and want to minimize overhead costs, I had the idea to at least load a handable bunch of data into the driver and process it as Pandas-DataFrame. Anyhow, this would lead to a loss of the parallelism-advantage Spark has.
Then I read that foreachPartition applies a function to all the data within a partition and, hence, allows parallel processing.
My questions now are:
When I apply a Python-function via foreachPartition, does the Python-execution take place within the driver process (and the partition-data is therefore transfered over the network to my driver)?
Is the data processed row-wise within foreachPartition (meaning every RDD-row is transfered one by one to the Python-instance), or is the partition-data processed at once (meaning, for example, the whole partition is transfered to the instance and is handled as whole by one Python-instance)?
Thank you in advance for your input!
Edit:
A working in driver-solution I used before looks like this, taken from SO here:
for partition in rdd.mapPartitions(lambda part: [list(partition )]).toLocalIterator():
# Do stuff on the partition
Run Code Online (Sandbox Code Playgroud)
As can be read from the docs rdd.toLocalIterator() provides the necessary functionality:
Return an iterator that contains all of the elements in this RDD. The iterator will consume as much memory as the largest partition in this RDD.
Luckily I stumbled upon this great explanation of mapPartitions from Mrinal (answered here).
mapPartitions applies a function on each partition of an RDD. Hence, parallelization can be used if the partitions are distributed over different nodes. The corresponding Python-instances, which are necessary for processing the Python-functions, are created on these nodes.
While foreachPartition only applies a function (e.g. write your data in a .csv-file), mapPartitions also returns a new RDD. Therefore, using foreachPartition was the wrong choice for me.
为了回答我的第二个问题:函数像map或UDFs创建一个新的 Python 实例并逐行传递来自 DataFrame/RDD 的数据,导致大量开销。foreachPartition和mapPartitions(两个 RDD 函数)将整个分区传输到 Python 实例。
此外,使用生成器还减少了迭代此传输的分区数据所需的内存量(分区作为迭代器对象处理,然后通过迭代此对象来处理每一行)。
一个示例可能如下所示:
def generator(partition):
"""
Function yielding some result created by some function applied to each row of a partition (in this case lower-casing a string)
@partition: iterator-object of partition
"""
for row in partition:
yield [word.lower() for word in row["text"]]
df = spark.createDataFrame([(["TESTA"], ), (["TESTB"], )], ["text"])
df = df.repartition(2)
df.rdd.mapPartitions(generator).toDF(["text"]).show()
#Result:
+-----+
| text|
+-----+
|testa|
|testb|
+-----+
Run Code Online (Sandbox Code Playgroud)
希望这可以帮助面临类似问题的人:)