我有一个DataFrame通过运行sqlContext.readParquet文件创建的.
在DataFrame由300个M行.我需要使用这些行作为另一个函数的输入,但我想以较小的批量执行它以防止OOM错误.
目前,我正在使用df.head(1000000)读取前1M行,但我找不到一种方法来读取后续行.我试过df.collect(),但它给了我一个Java OOM错误.
我想迭代这个数据帧.我尝试使用withColumn()API添加另一个列来生成一组唯一值来迭代,但数据框中的所有现有列都没有唯一值.
例如,我尝试了val df = df1.withColumn("newColumn", df1("col") + 1),以及val df = df1.withColumn("newColumn",lit(i+=1))两者都没有返回一组连续的值.
获取数据帧的前n行然后接下来的n行的任何其他方法,其作用类似于范围函数SqlContext?
我们尝试使用各种设置组合 - 但 mpstat 显示所有或大多数 cpu 始终被使用(在单个 8 核系统上)
已经尝试了以下内容:
将主设置为:
local[2]
Run Code Online (Sandbox Code Playgroud)
发送
conf.set("spark.cores.max","2")
Run Code Online (Sandbox Code Playgroud)
在火花配置中
还使用
--total-executor-cores 2
Run Code Online (Sandbox Code Playgroud)
和
--executor-cores 2
Run Code Online (Sandbox Code Playgroud)
在所有情况下
mpstat -A
Run Code Online (Sandbox Code Playgroud)
显示所有 CPU 都在使用 - 而不仅仅是主机。
所以我现在很茫然。我们确实需要将使用限制为指定数量的 CPU。
我在 emr 中使用 Jupyter Notebook 来处理大块数据。在处理数据时我看到这个错误:
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 108 tasks (1027.9 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
Run Code Online (Sandbox Code Playgroud)
看来我需要更新 Spark 配置中的 maxResultsSize 。如何从 jupyter 笔记本设置 Spark maxResultsSize。
已经检查过这篇文章:Spark 1.4增加maxResultSize内存
另外,在 emr 笔记本中,已经给出了 Spark 上下文,有什么方法可以编辑 Spark 上下文并增加 maxResultsSize
任何线索都会非常有帮助。
谢谢