Gop*_*ala 8 memory distributed-computing bigdata apache-spark pyspark
我有JSON数据,我正在读取具有多个字段的数据框,基于两列重新分区,并转换为Pandas.
这项工作在仅有600,000行数据的EMR上失败,但有一些模糊的错误.我还增加了火花驱动器的内存设置,但仍然没有看到任何分辨率.
这是我的pyspark代码:
enhDataDf = (
sqlContext
.read.json(sys.argv[1])
)
enhDataDf = (
enhDataDf
.repartition('column1', 'column2')
.toPandas()
)
enhDataDf = sqlContext.createDataFrame(enhDataDf)
enhDataDf = (
enhDataDf
.toJSON()
.saveAsTextFile(sys.argv[2])
)
Run Code Online (Sandbox Code Playgroud)
我的火花设置如下:
conf = SparkConf().setAppName('myapp1')
conf.set('spark.yarn.executor.memoryOverhead', 8192)
conf.set('spark.executor.memory', 8192)
conf.set('spark.driver.memory', 8192)
sc = SparkContext(conf=conf)
Run Code Online (Sandbox Code Playgroud)
我得到的错误是:
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down.
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties].
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties].
16/10/01 19:57:11 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/10/01 19:57:11 ERROR ApplicationMaster: User application exited with status 143
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties].
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties].
16/10/01 19:57:56 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/10/01 19:57:56 ERROR ApplicationMaster: User application exited with status 143
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down.
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
Run Code Online (Sandbox Code Playgroud)
该代码在最多约600,000个JSON行上运行良好 - 即使有大量可用内存.然后,它一直在失败.
有关正在发生的事情以及如何调试/修复此问题的任何想法?
我相信问题来自于您的代码的以下部分:
enhDataDf = (
enhDataDf
.repartition('column1', 'column2')
.toPandas()
)
Run Code Online (Sandbox Code Playgroud)
.toPandas()
收集数据,因此当记录数量增加时,将导致驱动程序故障。
根据您的评论,这就是您使用的确切管道。这意味着整个舞台不仅过时而且不正确。当数据被收集并进一步并行化时,可以保证由
.repartition('column1', 'column2')
Run Code Online (Sandbox Code Playgroud)
当您重新创建 Spark 时将被保留DataFrame
:
sqlContext.createDataFrame(enhDataDf)
Run Code Online (Sandbox Code Playgroud)
如果你想按列写入数据,可以直接这样做:
(sqlContext
.read.json(sys.argv[1])
.repartition('column1', 'column2')
.write
.json(sys.argv[2]))
Run Code Online (Sandbox Code Playgroud)
跳过中间过程toPandas
并转换为 RDD。
根据您的评论:
如果toPandas
有目的,那么它将始终成为管道中的限制因素,唯一直接的解决方案是扩大驱动程序节点的规模。根据您对收集的数据应用的具体算法,您可以考虑替代选项:
归档时间: |
|
查看次数: |
2159 次 |
最近记录: |