Pyspark简单的重新分区和toPandas()无法完成600,000多行

Gop*_*ala 8 memory distributed-computing bigdata apache-spark pyspark

我有JSON数据,我正在读取具有多个字段的数据框,基于两列重新分区,并转换为Pandas.

这项工作在仅有600,000行数据的EMR上失败,但有一些模糊的错误.我还增加了火花驱动器的内存设置,但仍然没有看到任何分辨率.

这是我的pyspark代码:

enhDataDf = (
    sqlContext
    .read.json(sys.argv[1])
    )

enhDataDf = (
    enhDataDf
    .repartition('column1', 'column2')
    .toPandas()
    )
enhDataDf = sqlContext.createDataFrame(enhDataDf)
enhDataDf = (
    enhDataDf
    .toJSON()
    .saveAsTextFile(sys.argv[2])
    )
Run Code Online (Sandbox Code Playgroud)

我的火花设置如下:

conf = SparkConf().setAppName('myapp1')
conf.set('spark.yarn.executor.memoryOverhead', 8192)
conf.set('spark.executor.memory', 8192)
conf.set('spark.driver.memory', 8192)
sc = SparkContext(conf=conf)
Run Code Online (Sandbox Code Playgroud)

我得到的错误是:

16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down.
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties].
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties].
16/10/01 19:57:11 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/10/01 19:57:11 ERROR ApplicationMaster: User application exited with status 143
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties].
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties].
16/10/01 19:57:56 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/10/01 19:57:56 ERROR ApplicationMaster: User application exited with status 143
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down.
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down.
Run Code Online (Sandbox Code Playgroud)

该代码在最多约600,000个JSON行上运行良好 - 即使有大量可用内存.然后,它一直在失败.

有关正在发生的事情以及如何调试/修复此问题的任何想法?

zer*_*323 4

我相信问题来自于您的代码的以下部分:

enhDataDf = (
    enhDataDf
    .repartition('column1', 'column2')
    .toPandas()
)
Run Code Online (Sandbox Code Playgroud)

.toPandas()收集数据,因此当记录数量增加时,将导致驱动程序故障。

根据您的评论,这就是您使用的确切管道。这意味着整个舞台不仅过时而且不正确。当数据被收集并进一步并行化时,可以保证由

.repartition('column1', 'column2')
Run Code Online (Sandbox Code Playgroud)

当您重新创建 Spark 时将被保留DataFrame

sqlContext.createDataFrame(enhDataDf)
Run Code Online (Sandbox Code Playgroud)

如果你想按列写入数据,可以直接这样做:

(sqlContext
    .read.json(sys.argv[1])
    .repartition('column1', 'column2')
    .write
    .json(sys.argv[2]))
Run Code Online (Sandbox Code Playgroud)

跳过中间过程toPandas并转换为 RDD。

根据您的评论:

如果toPandas有目的,那么它将始终成为管道中的限制因素,唯一直接的解决方案是扩大驱动程序节点的规模。根据您对收集的数据应用的具体算法,您可以考虑替代选项:

  • 重新实现您在 Spark 之上使用的算法尚不可用。
  • 考虑具有更好的 SciPy 堆栈互操作性的替代框架(例如Dask)。