Kon*_*tin 2 python pandas pyspark pyspark-sql
我有一个关于过滤 Pandas 和 pyspark 数据帧时的时间差的问题:
import time
import numpy as np
import pandas as pd
from random import shuffle
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = pd.DataFrame(np.random.randint(1000000, size=400000).reshape(-1, 2))
list_filter = list(range(10000))
shuffle(list_filter)
# pandas is fast
t0 = time.time()
df_filtered = df[df[0].isin(list_filter)]
print(time.time() - t0)
# 0.0072
df_spark = spark.createDataFrame(df)
# pyspark is slow
t0 = time.time()
df_spark_filtered = df_spark[df_spark[0].isin(list_filter)]
print(time.time() - t0)
# 3.1232
Run Code Online (Sandbox Code Playgroud)
如果我将 的长度list_filter增加到 10000,那么执行时间是 0.01353 和 17.6768 秒。isin 的 Pandas 实现似乎 在计算上是高效的。你能解释一下为什么 pyspark 数据帧的过滤这么慢,我如何快速执行这样的过滤?
小智 7
您需要使用 join 代替带有 isin 子句的过滤器来加速 pyspark 中的过滤器操作:
import time
import numpy as np
import pandas as pd
from random import shuffle
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = pd.DataFrame(np.random.randint(1000000, size=400000).reshape(-1, 2))
df_spark = spark.createDataFrame(df)
list_filter = list(range(10000))
list_filter_df = spark.createDataFrame([[x] for x in list_filter], df_spark.columns[:1])
shuffle(list_filter)
# pandas is fast because everything in memory
t0 = time.time()
df_filtered = df[df[0].isin(list_filter)]
print(time.time() - t0)
# 0.0227580165863
# 0.0127580165863
# pyspark is slow because there is memory overhead, but broadcast make is mast compared to isin with lists
t0 = time.time()
df_spark_filtered = df_spark.join(F.broadcast(list_filter_df), df_spark.columns[:1])
print(time.time() - t0)
# 0.0571971035004
# 0.0471971035004
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4381 次 |
| 最近记录: |