pyspark 1.6.0 写入镶木地板会出现“路径存在”错误

mad*_*mad 6 apache-spark pyspark

我从不同文件夹的镶木地板文件中读取,例如今年 2 月(一个文件夹 = 一天)

indata = sqlContext.read.parquet('/data/myfolder/201602*')
Run Code Online (Sandbox Code Playgroud)

做一些非常简单的分组和聚合

outdata = indata.groupby(...).agg()
Run Code Online (Sandbox Code Playgroud)

并想再次存储。

outdata.write.parquet(outloc)
Run Code Online (Sandbox Code Playgroud)

这是我从 bash 运行脚本的方法:

spark-submit 
  --master yarn-cluster 
  --num-executors 16 
  --executor-cores 4 
  --driver-memory 8g
  --executor-memory 16g 
  --files /etc/hive/conf/hive-site.xml  
  --driver-java-options 
  -XX:MaxPermSize=512m 
  spark_script.py
Run Code Online (Sandbox Code Playgroud)

这会产生多个工作(这是正确的术语吗?)。第一个作业成功运行。后续作业失败并显示以下错误:

  Traceback (most recent call last):
  File "spark_generate_maps.py", line 184, in <module>
    outdata.write.parquet(outloc)
  File "/opt/cloudera/parcels/CDH-5.9.0-1.cdh5.9.0.p0.23/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 471, in parquet
  File "/opt/cloudera/parcels/CDH-5.9.0-1.cdh5.9.0.p0.23/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/opt/cloudera/parcels/CDH-5.9.0-1.cdh5.9.0.p0.23/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 51, in deco
pyspark.sql.utils.AnalysisException: u'path OBFUSCATED_PATH_THAT_I_CLEANED_BEFORE_SUBMIT already exists.;'
Run Code Online (Sandbox Code Playgroud)

当我只提供一个文件夹作为输入时,这工作正常。

因此,似乎第一个作业创建了该文件夹,所有后续作业都无法写入该文件夹。为什么?

以防万一这可以帮助任何人:

进口:

from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import HiveContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf, collect_list, countDistinct, count
import pyspark.sql.functions as func
from pyspark.sql.functions import lit
import numpy as np
import sys
import math
Run Code Online (Sandbox Code Playgroud)

配置:

conf = SparkConf().setAppName('spark-compute-maps').setMaster('yarn-cluster')
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
Run Code Online (Sandbox Code Playgroud)

Sam*_*ter 9

您的问题是“为什么 Spark 迭代输入文件夹,但应用默认write模式,这在该上下文中没有意义”。

引用Spark V1.6 Python API ...

mode(saveMode)
  指定数据或表已存在时的行为。
  选项包括:
    append   将此 DataFrame 的内容附加到现有数据。
    覆盖覆盖现有数据。
    error       如果数据已经存在则抛出异常。
    忽略    忽略该操作,如果数据已经存在。

我认为outdata.write.mode('append').parquet(outloc)值得一试。