小编Wen*_*Wit的帖子

Pyspark:序列化任务超过了允许的最大值。考虑增加 spark.rpc.message.maxSize 或对大值使用广播变量

我正在对集群进行计算,最后当我使用 df.describe().show() 询问有关 Spark 数据帧的摘要统计信息时,出现错误:

序列化任务 15:0 为 137500581 字节,超过了最大允许值:spark.rpc.message.maxSize(134217728 字节)。考虑增加 spark.rpc.message.maxSize 或对大值使用广播变量

在我的 Spark 配置中,我已经尝试增加上述参数:

spark = (SparkSession
         .builder
         .appName("TV segmentation - dataprep for scoring")
         .config("spark.executor.memory", "25G")
         .config("spark.driver.memory", "40G")
         .config("spark.dynamicAllocation.enabled", "true")
         .config("spark.dynamicAllocation.maxExecutors", "12")
         .config("spark.driver.maxResultSize", "3g")
         .config("spark.kryoserializer.buffer.max.mb", "2047mb")
         .config("spark.rpc.message.maxSize", "1000mb")
         .getOrCreate())
Run Code Online (Sandbox Code Playgroud)

我还尝试使用以下方法重新分区我的数据框:

spark = (SparkSession
         .builder
         .appName("TV segmentation - dataprep for scoring")
         .config("spark.executor.memory", "25G")
         .config("spark.driver.memory", "40G")
         .config("spark.dynamicAllocation.enabled", "true")
         .config("spark.dynamicAllocation.maxExecutors", "12")
         .config("spark.driver.maxResultSize", "3g")
         .config("spark.kryoserializer.buffer.max.mb", "2047mb")
         .config("spark.rpc.message.maxSize", "1000mb")
         .getOrCreate())
Run Code Online (Sandbox Code Playgroud)

但我仍然不断收到同样的错误。

我的环境:Python 3.5、Anaconda 5.0、Spark 2

我怎样才能避免这个错误?

rpc message max-size dataframe pyspark

13
推荐指数
3
解决办法
1万
查看次数

计算pyspark数据框列上的百分位数

我有一个PySpark数据框,其中包含一个ID,然后包含几个要为其计算95%点的变量。

printSchema()的一部分:

root
 |-- ID: string (nullable = true)
 |-- MOU_G_EDUCATION_ADULT: double (nullable = false)
 |-- MOU_G_EDUCATION_KIDS: double (nullable = false)
Run Code Online (Sandbox Code Playgroud)

在python中找到了如何使用Spark Data frame和GroupBy导出百分位数,但这失败并显示错误消息:

perc95_udf = udf(lambda x: x.quantile(.95))


fanscores = genres.withColumn("P95_MOU_G_EDUCATION_ADULT", perc95_udf('MOU_G_EDUCATION_ADULT')) \
                      .withColumn("P95_MOU_G_EDUCATION_KIDS", perc95_udf('MOU_G_EDUCATION_KIDS'))

fanscores.take(2) 
Run Code Online (Sandbox Code Playgroud)

AttributeError:“ float”对象没有属性“ quantile”

我已经尝试过的其他UDF试验:

def percentile(quantiel,kolom):
    x=np.array(kolom)
    perc=np.percentile(x, quantiel)
    return perc

percentile_udf = udf(percentile, LongType())


fanscores = genres.withColumn("P95_MOU_G_EDUCATION_ADULT", percentile_udf(quantiel=95, kolom=genres.MOU_G_EDUCATION_ADULT)) \
                  .withColumn("P95_MOU_G_EDUCATION_KIDS", percentile_udf(quantiel=95, kolom=genres.MOU_G_EDUCATION_KIDS))

fanscores.take(2)   
Run Code Online (Sandbox Code Playgroud)

给出错误:“ TypeError:wrapper()得到了意外的关键字参数'quantiel'”

我的最终审判:

import numpy as np

def percentile(quantiel):
    return udf(lambda kolom: np.percentile(np.array(kolom), quantiel)) …
Run Code Online (Sandbox Code Playgroud)

percentile quantile dataframe pyspark

4
推荐指数
1
解决办法
3927
查看次数

标签 统计

dataframe ×2

pyspark ×2

max-size ×1

message ×1

percentile ×1

quantile ×1

rpc ×1