在 Spark 上过滤数据帧的有效方法?

Jon*_*n.. 2 apache-spark apache-spark-sql pyspark

Pyspark 程序.....

df [df ["timeDiff"] <= 30]
        or
df.filter(df["timeDiff"] <= 30)
Run Code Online (Sandbox Code Playgroud)

两个代码给出了相同的结果。但是有人可以解释一下在 Spark 分布式环境中哪个更有效。或者参考一些文件。我尝试在 stackoverflow 上搜索但没有成功....

小智 6

在生成的执行计划方面,两者完全相同,因此您可以使用任何您喜欢的方式 - 不会有任何性能差异。

然而,后一种方法是惯用的方法,大多数示例、教程和项目都会使用这种方法。此外,它与 Scala API 几乎相同。所以通常最好减少开发工作。


mar*_*oyo 6

加起来@user10954945,以下是两者的执行计划:

import pyspark
sc = pyspark.SparkContext.getOrCreate()
spark = pyspark.sql.SparkSession(sc)

df = spark.createDataFrame(((1,), (2,)), ['timeDiff'])
filtered_1 = df[df["timeDiff"] <= 30]
filtered_2 = df.filter(df["timeDiff"] <= 30)

filtered_1.explain()

== Physical Plan ==    
*(1) Filter (isnotnull(timeDiff#6L) && (timeDiff#6L <= 30))
+- Scan ExistingRDD[timeDiff#6L]

filtered_2.explain()

== Physical Plan ==
*(1) Filter (isnotnull(timeDiff#6L) && (timeDiff#6L <= 30))
+- Scan ExistingRDD[timeDiff#6L]
Run Code Online (Sandbox Code Playgroud)

事实上,使用 SQL API 可以获得相同的结果:

df.createOrReplaceTempView('df')
filtered_3 = spark.sql("SELECT * FROM df WHERE timeDiff <= 30")
filtered_3.explain()

== Physical Plan ==
*(1) Filter (isnotnull(timeDiff#6L) && (timeDiff#6L <= 30))
+- Scan ExistingRDD[timeDiff#6L]
Run Code Online (Sandbox Code Playgroud)