我如何强制Spark执行对map的调用,即使它认为由于其懒惰的评估而不需要执行它?
我试图把cache()地图调用,但仍然没有做到这一点.我的map方法实际上将结果上传到HDFS.所以,它并非无用,但Spark认为它是.
我知道有很多关于Spark的内存错误的问题,但我还没有找到解决方案.
我有一个简单的工作流程:
filter 下到一小部分行 select 一小部分列collect进入驱动程序节点(所以我可以做其他操作R)当我运行上面然后cache表格来激发内存它需要<2GB - 与我的集群可用的内存相比很小 - 然后当我尝试collect将数据发送到我的驱动程序节点时,我收到OOM错误.
我试过运行以下设置:
对于这些我都玩过的多种配置executor.memory,driver.memory以及driver.maxResultSize覆盖全系列我的可用内存中的可能值,但始终我结束了一个内存不足的错误的collect阶段; 或者
java.lang.OutOfMemoryError: Java heap space,
java.lang.OutOfMemoryError : GC overhead limit exceeded或者
Error in invoke_method.spark_shell_connection(spark_connection(jobj), :
No status is returned.(sparklyr表示存储器问题的错误).
根据我对Spark的[有限]理解,在收集之前缓存一个表应强制进行所有计算 - 即如果表在缓存<2GB后快乐地坐在内存中,那么我不需要超过2GB的内存来收集它进入驱动程序节点.
请注意,这个问题的答案有一些我尚未尝试的建议,但这些可能会影响性能(例如序列化RDD),所以如果可能的话,我们希望避免使用.
我的问题:
谢谢
编辑:请注意以下@ Shaido的评论,cache通过Sparklyr 调用"通过执行count(*)表格来强制数据加载到内存中"[来自Sparklyr文档] - …
我试图找到一种可靠的方法来以编程方式计算Spark数据帧的大小(以字节为单位).
原因是我希望有一种方法来计算"最佳"分区数量("最佳"可能意味着不同的东西:它可能意味着 具有最佳分区大小,或者在写入Parquet时产生最佳文件大小表 - 但两者都可以假设为数据帧大小的某些线性函数.换句话说,我想调用coalesce(n)或repartition(n)在数据帧上,其中n不是固定数字,而是数据帧大小的函数.
关于SO的其他主题建议使用SizeEstimator.estimatefrom org.apache.spark.util来获取数据帧的字节大小,但我得到的结果是不一致的.
首先,我将数据帧保存到内存中:
df.cache().count
Run Code Online (Sandbox Code Playgroud)
Spark UI在"存储"选项卡中显示大小为4.8GB.然后,我运行以下命令来获取大小SizeEstimator:
import org.apache.spark.util.SizeEstimator
SizeEstimator.estimate(df)
Run Code Online (Sandbox Code Playgroud)
这给出了115'715'808字节= ~116MB的结果.但是,应用于SizeEstimator不同的对象会导致非常不同的结果.例如,我尝试分别为数据帧中的每一行计算大小并将它们相加:
df.map(row => SizeEstimator.estimate(row.asInstanceOf[ AnyRef ])).reduce(_+_)
Run Code Online (Sandbox Code Playgroud)
这导致12'084'698'256字节= ~12GB的大小.或者,我可以尝试应用于SizeEstimator每个分区:
df.mapPartitions(
iterator => Seq(SizeEstimator.estimate(
iterator.toList.map(row => row.asInstanceOf[ AnyRef ]))).toIterator
).reduce(_+_)
Run Code Online (Sandbox Code Playgroud)
这又导致10'792'965'376字节的不同大小=〜10.8GB.
我知道存在内存优化/内存开销,但在执行这些测试之后,我没有看到如何SizeEstimator使用它来获得足够好的数据帧大小估计(以及因此分区大小或结果Parquet文件大小).
SizeEstimator为了获得对数据帧大小或其分区的良好估计,应用适当的方法(如果有的话)是什么?如果没有,这里建议的方法是什么?
我创建了一个Spark Dataset[Long]:
scala> val ds = spark.range(100000000)
ds: org.apache.spark.sql.Dataset[Long] = [id: bigint]
Run Code Online (Sandbox Code Playgroud)
当我运行ds.count它给我结果0.2s(在4 Core 8GB机器上).此外,它创建的DAG如下:
但是,当我跑的ds.rdd.count时候给了我结果4s(同一台机器).但它创建的DAG如下:
所以,我的怀疑是:
ds.rdd.count只创造一个阶段而ds.count创造两个阶段?ds.rdd.count只有一个阶段时,为什么它比ds.count两个阶段慢?performance scala apache-spark apache-spark-sql apache-spark-dataset