nev*_*_me 7 python apache-spark pyspark
我正在将一些用 Pandas 编写的代码转换为 PySpark。该代码有很多for循环来根据用户指定的输入创建可变数量的列。
我使用的是 Spark 1.6.x,示例代码如下:
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
import pandas as pd
import numpy as np
# create a Pandas DataFrame, then convert to Spark DataFrame
test = sqlContext.createDataFrame(pd.DataFrame({'val1': np.arange(1,11)}))
Run Code Online (Sandbox Code Playgroud)
这给我留下了
+----+
|val1|
+----+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
+----+
Run Code Online (Sandbox Code Playgroud)
我在代码中循环了很多,例如下面的:
for i in np.arange(2,6).tolist():
test = test.withColumn('val_' + str(i), F.lit(i ** 2) + test.val1)
Run Code Online (Sandbox Code Playgroud)
结果是:
+----+-----+-----+-----+-----+
|val1|val_2|val_3|val_4|val_5|
+----+-----+-----+-----+-----+
| 1| 5| 10| 17| 26|
| 2| 6| 11| 18| 27|
| 3| 7| 12| 19| 28|
| 4| 8| 13| 20| 29|
| 5| 9| 14| 21| 30|
| 6| 10| 15| 22| 31|
| 7| 11| 16| 23| 32|
| 8| 12| 17| 24| 33|
| 9| 13| 18| 25| 34|
| 10| 14| 19| 26| 35|
+----+-----+-----+-----+-----+
Run Code Online (Sandbox Code Playgroud)
**问题:** 如何重写上述循环以提高效率?
我注意到我的代码运行速度较慢,因为 Spark 在每组循环上花费了大量时间(即使是在像 2GB 的文本输入这样的小数据集上)。
谢谢
重复调用 JVM 方法的开销很小,否则单独的 for 循环应该不成问题。您可以使用单个选择稍微改进它:
df = spark.range(1, 11).toDF("val1")
def make_col(i):
return (F.pow(F.lit(i), 2) + F.col("val1")).alias("val_{0}".format(i))
spark.range(1, 11).toDF("val1").select("*", *(make_col(i) for i in range(2, 6)))
Run Code Online (Sandbox Code Playgroud)
我也会避免使用 NumPy 类型。与普通 Python 对象相比,初始化 NumPy 对象通常更昂贵,并且 Spark SQL 不支持 NumPy 类型,因此需要一些额外的转换。