相关疑难解决方法(0)

如何强制Spark执行代码?

我如何强制Spark执行对map的调用,即使它认为由于其懒惰的评估而不需要执行它?

我试图把cache()地图调用,但仍然没有做到这一点.我的map方法实际上将结果上传到HDFS.所以,它并非无用,但Spark认为它是.

java hadoop scala apache-spark

28
推荐指数
2
解决办法
2万
查看次数

从Spark群集中收集数据时出现内存不足错误

我知道有很多关于Spark的内存错误的问题,但我还没有找到解决方案.

我有一个简单的工作流程:

  1. 从Amazon S3读入ORC文件
  2. filter 下到一小部分行
  3. select 一小部分列
  4. collect进入驱动程序节点(所以我可以做其他操作R)

当我运行上面然后cache表格来激发内存它需要<2GB - 与我的集群可用的内存相比很小 - 然后当我尝试collect将数据发送到我的驱动程序节点时,我收到OOM错误.

我试过运行以下设置:

  • 具有32核和244GB内存的计算机上的本地模式
  • 独立模式,具有10 x 6.2 GB执行程序和61 GB驱动程序节点

对于这些我都玩过的多种配置executor.memory,driver.memory以及driver.maxResultSize覆盖全系列我的可用内存中的可能值,但始终我结束了一个内存不足的错误的collect阶段; 或者 java.lang.OutOfMemoryError: Java heap space,
java.lang.OutOfMemoryError : GC overhead limit exceeded或者 Error in invoke_method.spark_shell_connection(spark_connection(jobj), : No status is returned.(sparklyr表示存储器问题的错误).

根据我对Spark的[有限]理解,在收集之前缓存一个表应强制进行所有计算 - 即如果表在缓存<2GB后快乐地坐在内存中,那么我不需要超过2GB的内存来收集它进入驱动程序节点.

请注意,这个问题的答案有一些我尚未尝试的建议,但这些可能会影响性能(例如序列化RDD),所以如果可能的话,我们希望避免使用.

我的问题:

  1. 如何在缓存后占用如此少空间的数据帧会导致内存问题?
  2. 在我继续讨论可能影响性能的其他选项之前,有什么显而易见的东西可以检查/更改/排除故障以帮助解决问题吗?

谢谢

编辑:请注意以下@ Shaido的评论,cache通过Sparklyr 调用"通过执行count(*)表格来强制数据加载到内存中"[来自Sparklyr文档] - …

memory apache-spark sparklyr

13
推荐指数
1
解决办法
8466
查看次数

计算Spark数据帧的大小 - SizeEstimator会产生意外结果

我试图找到一种可靠的方法来以编程方式计算Spark数据帧的大小(以字节为单位).

原因是我希望有一种方法来计算"最佳"分区数量("最佳"可能意味着不同的东西:它可能意味着 具有最佳分区大小,或者写入Parquet时产生最佳文件大小表 - 但两者都可以假设为数据帧大小的某些线性函数.换句话说,我想调用coalesce(n)repartition(n)在数据帧上,其中n不是固定数字,而是数据帧大小的函数.

关于SO的其他主题建议使用SizeEstimator.estimatefrom org.apache.spark.util来获取数据帧的字节大小,但我得到的结果是不一致的.

首先,我将数据帧保存到内存中:

df.cache().count 
Run Code Online (Sandbox Code Playgroud)

Spark UI在"存储"选项卡中显示大小为4.8GB.然后,我运行以下命令来获取大小SizeEstimator:

import org.apache.spark.util.SizeEstimator
SizeEstimator.estimate(df)
Run Code Online (Sandbox Code Playgroud)

这给出了115'715'808字节= ~116MB的结果.但是,应用于SizeEstimator不同的对象会导致非常不同的结果.例如,我尝试分别为数据帧中的每一行计算大小并将它们相加:

df.map(row => SizeEstimator.estimate(row.asInstanceOf[ AnyRef ])).reduce(_+_)
Run Code Online (Sandbox Code Playgroud)

这导致12'084'698'256字节= ~12GB的大小.或者,我可以尝试应用于SizeEstimator每个分区:

df.mapPartitions(
    iterator => Seq(SizeEstimator.estimate(
        iterator.toList.map(row => row.asInstanceOf[ AnyRef ]))).toIterator
).reduce(_+_)
Run Code Online (Sandbox Code Playgroud)

这又导致10'792'965'376字节的不同大小=〜10.8GB.

我知道存在内存优化/内存开销,但在执行这些测试之后,我没有看到如何SizeEstimator使用它来获得足够好的数据帧大小估计(以及因此分区大小或结果Parquet文件大小).

SizeEstimator为了获得对数据帧大小或其分区的良好估计,应用适当的方法(如果有的话)是什么?如果没有,这里建议的方法是什么?

apache-spark spark-dataframe

10
推荐指数
3
解决办法
1万
查看次数

为什么dataset.count()比rdd.count()更快?

我创建了一个Spark Dataset[Long]:

scala> val ds = spark.range(100000000)
ds: org.apache.spark.sql.Dataset[Long] = [id: bigint]
Run Code Online (Sandbox Code Playgroud)

当我运行ds.count它给我结果0.2s(在4 Core 8GB机器上).此外,它创建的DAG如下:

在此输入图像描述

但是,当我跑的ds.rdd.count时候给了我结果4s(同一台机器).但它创建的DAG如下:

在此输入图像描述

所以,我的怀疑是:

  1. 为什么ds.rdd.count只创造一个阶段而ds.count创造两个阶段?
  2. 此外,当ds.rdd.count只有一个阶段时,为什么它比ds.count两个阶段慢?

performance scala apache-spark apache-spark-sql apache-spark-dataset

5
推荐指数
1
解决办法
4141
查看次数