gar*_*and 5 python amazon-web-services amazon-emr apache-spark pyspark
我正在尝试对 pyspark 数据框执行一些操作。数据框看起来像这样:
user domain1 domain2 ........ domain100 conversions
abcd 1 0 ........ 0 1
gcea 0 0 ........ 1 0
. . . ........ . .
. . . ........ . .
. . . ........ . .
Run Code Online (Sandbox Code Playgroud)
如果数据帧很小,我使用的代码对我来说可以很好地进一步操作上面的数据帧,例如,它对于以下形状的数据帧可以很好地工作:
(148457,41)
Run Code Online (Sandbox Code Playgroud)
但是,如果我增加数据框的大小,例如:
(2184934,324)
Run Code Online (Sandbox Code Playgroud)
我无法继续前进,因为一旦我在数据帧上执行任何类型的代码,甚至像 count() 操作超时这样的事情,笔记本就会超时或抛出会话超时错误消息。超时消息如下所示:
An error was encountered:
Invalid status code '400' from
https://172.31.12.103:18888/sessions/5/statements/20 with error
payload:
"requirement failed: Session isn't active."
Run Code Online (Sandbox Code Playgroud)
这个超时需要1~2秒(不需要很长时间就超时)。
我没有使用collect()或任何topandas()操作来使其超时。我试图对上述数据帧执行的操作是对数据进行欠采样,但在数据帧大小增加后,我似乎无法进行简单的 .count() 操作。
我已经尝试在 emr 集群中使用不同类型的实例来使其工作。当我使用较小的数据帧时,c5.2xlarge 类型实例就足够了,但对于较大的数据帧,即使我使用 c5.18xlarge 实例,它也不起作用。我的集群中有 1 个主节点和 2 个从节点。
这就是我正在尝试对数据框执行的操作
#Undersampling.
from pyspark.sql.functions import col, when
def resample(base_features,ratio,class_field,base_class):
pos = base_features.filter(col(class_field)==base_class)
neg = base_features.filter(col(class_field)!=base_class)
total_pos = pos.count()
total_neg = neg.count()
fraction=float(total_pos*ratio)/float(total_neg)
sampled = neg.sample(True,fraction)
a = sampled.union(pos)
return a
undersampled_df = resample(df,10,'conversions',1)
Run Code Online (Sandbox Code Playgroud)
我该如何解决这个问题?关于我应该采取哪些步骤有什么建议吗?
我遇到了同样的问题,对我来说,通过火花魔法增加驾驶员记忆是有效的。默认情况下,驱动程序内存是1000M您通过 JupyterHub 创建 Spark 应用程序时的内存。做就是了
%%configure -f
{"driverMemory": "6000M"}
Run Code Online (Sandbox Code Playgroud)
它将重新启动 Spark 应用程序,您可以通过执行以下操作来查看更新的驱动程序内存
spark.sparkContext.getConf().get('spark.driver.memory')
Run Code Online (Sandbox Code Playgroud)
希望能帮助到你。
| 归档时间: |
|
| 查看次数: |
1833 次 |
| 最近记录: |