Mar*_*usz 6 apache-spark pyspark spark-koalas
在比较 pyspark 3.2.1 中的两个 API 时,我得到了奇怪的性能结果,这两个 API 提供了在 Spark Dataframe 的分组结果上运行 pandas UDF 的能力:
首先,我在本地 Spark 模式(Spark 3.2.1)下运行以下输入生成器代码:
import pyspark.sql.types as types
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
import pyspark.pandas as ps
spark = SparkSession.builder \
.config("spark.sql.execution.arrow.pyspark.enabled", True) \
.getOrCreate()
ps.set_option("compute.default_index_type", "distributed")
spark.range(1000000).withColumn('group', (col('id') / 10).cast('int')) \
.write.parquet('/tmp/sample_input', mode='overwrite')
Run Code Online (Sandbox Code Playgroud)
然后我测试applyInPandas:
def getsum(pdf):
pdf['sum_in_group'] = pdf['id'].sum()
return pdf
df = spark.read.parquet(f'/tmp/sample_input')
output_schema = types.StructType(
df.schema.fields + [types.StructField('sum_in_group', types.FloatType())]
)
df.groupBy('group').applyInPandas(getsum, schema=output_schema) \
.write.parquet('/tmp/schematest', mode='overwrite')
Run Code Online (Sandbox Code Playgroud)
代码执行时间不到 30 秒(在 i7-9750H CPU 上)
然后,我尝试了新的 API - 虽然我真的很欣赏代码的样子:
def getsum(pdf) -> ps.DataFrame["id": int, "group": int, "sum_in_group": int]:
pdf['sum_in_group'] = pdf['id'].sum()
return pdf
df = ps.read_parquet(f'/tmp/sample_input')
df.groupby('group').apply(getsum) \
.to_parquet('/tmp/schematest', mode='overwrite')
Run Code Online (Sandbox Code Playgroud)
...每次在同一个 CPU 上执行时间至少为 1m 40s ,因此这个简单操作的速度慢了 3 倍以上。
我知道sum_in_group在没有 panadas 参与的情况下添加可以更有效地完成,但这只是提供一个最小的例子。任何其他操作也至少慢 3 倍。
您知道造成这种放缓的原因是什么吗?也许我缺少一些上下文参数来使它们在相似的时间执行?
| 归档时间: |
|
| 查看次数: |
2155 次 |
| 最近记录: |