EMR 笔记本会话在大型数据帧 (pyspark) 上几秒内超时(使用 pyspark)

gar*_*and 5 python amazon-web-services amazon-emr apache-spark pyspark

我正在尝试对 pyspark 数据框执行一些操作。数据框看起来像这样:

    user    domain1    domain2 ........ domain100    conversions

    abcd      1          0     ........    0            1
    gcea      0          0     ........    1            0
     .        .          .     ........    .            .
     .        .          .     ........    .            .
     .        .          .     ........    .            .
Run Code Online (Sandbox Code Playgroud)

如果数据帧很小,我使用的代码对我来说可以很好地进一步操作上面的数据帧,例如,它对于以下形状的数据帧可以很好地工作:

    (148457,41)
Run Code Online (Sandbox Code Playgroud)

但是,如果我增加数据框的大小,例如:

    (2184934,324)
Run Code Online (Sandbox Code Playgroud)

我无法继续前进,因为一旦我在数据帧上执行任何类型的代码,甚至像 count() 操作超时这样的事情,笔记本就会超时或抛出会话超时错误消息。超时消息如下所示:

    An error was encountered:
    Invalid status code '400' from 
    https://172.31.12.103:18888/sessions/5/statements/20 with error 
    payload: 
    "requirement failed: Session isn't active."
Run Code Online (Sandbox Code Playgroud)

这个超时需要1~2秒(不需要很长时间就超时)。

我没有使用collect()或任何topandas()操作来使其超时。我试图对上述数据帧执行的操作是对数据进行欠采样,但在数据帧大小增加后,我似乎无法进行简单的 .count() 操作。

我已经尝试在 emr 集群中使用不同类型的实例来使其工作。当我使用较小的数据帧时,c5.2xlarge 类型实例就足够了,但对于较大的数据帧,即使我使用 c5.18xlarge 实例,它也不起作用。我的集群中有 1 个主节点和 2 个从节点。

这就是我正在尝试对数据框执行的操作

#Undersampling.
from pyspark.sql.functions import col, when
def resample(base_features,ratio,class_field,base_class):
    pos = base_features.filter(col(class_field)==base_class)
    neg = base_features.filter(col(class_field)!=base_class)
    total_pos = pos.count()
    total_neg = neg.count()
    fraction=float(total_pos*ratio)/float(total_neg)
    sampled = neg.sample(True,fraction)
    a = sampled.union(pos)
    return a
undersampled_df = resample(df,10,'conversions',1)
Run Code Online (Sandbox Code Playgroud)

我该如何解决这个问题?关于我应该采取哪些步骤有什么建议吗?

Kob*_*oba 1

我遇到了同样的问题,对我来说,通过火花魔法增加驾驶员记忆是有效的。默认情况下,驱动程序内存是1000M您通过 JupyterHub 创建 Spark 应用程序时的内存。做就是了

%%configure -f 
{"driverMemory": "6000M"}
Run Code Online (Sandbox Code Playgroud)

它将重新启动 Spark 应用程序,您可以通过执行以下操作来查看更新的驱动程序内存

spark.sparkContext.getConf().get('spark.driver.memory')
Run Code Online (Sandbox Code Playgroud)

希望能帮助到你。