mad*_*mad 6 apache-spark pyspark
我从不同文件夹的镶木地板文件中读取,例如今年 2 月(一个文件夹 = 一天)
indata = sqlContext.read.parquet('/data/myfolder/201602*')
Run Code Online (Sandbox Code Playgroud)
做一些非常简单的分组和聚合
outdata = indata.groupby(...).agg()
Run Code Online (Sandbox Code Playgroud)
并想再次存储。
outdata.write.parquet(outloc)
Run Code Online (Sandbox Code Playgroud)
这是我从 bash 运行脚本的方法:
spark-submit
--master yarn-cluster
--num-executors 16
--executor-cores 4
--driver-memory 8g
--executor-memory 16g
--files /etc/hive/conf/hive-site.xml
--driver-java-options
-XX:MaxPermSize=512m
spark_script.py
Run Code Online (Sandbox Code Playgroud)
这会产生多个工作(这是正确的术语吗?)。第一个作业成功运行。后续作业失败并显示以下错误:
Traceback (most recent call last):
File "spark_generate_maps.py", line 184, in <module>
outdata.write.parquet(outloc)
File "/opt/cloudera/parcels/CDH-5.9.0-1.cdh5.9.0.p0.23/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 471, in parquet
File "/opt/cloudera/parcels/CDH-5.9.0-1.cdh5.9.0.p0.23/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/opt/cloudera/parcels/CDH-5.9.0-1.cdh5.9.0.p0.23/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 51, in deco
pyspark.sql.utils.AnalysisException: u'path OBFUSCATED_PATH_THAT_I_CLEANED_BEFORE_SUBMIT already exists.;'
Run Code Online (Sandbox Code Playgroud)
当我只提供一个文件夹作为输入时,这工作正常。
因此,似乎第一个作业创建了该文件夹,所有后续作业都无法写入该文件夹。为什么?
以防万一这可以帮助任何人:
进口:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import HiveContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf, collect_list, countDistinct, count
import pyspark.sql.functions as func
from pyspark.sql.functions import lit
import numpy as np
import sys
import math
Run Code Online (Sandbox Code Playgroud)
配置:
conf = SparkConf().setAppName('spark-compute-maps').setMaster('yarn-cluster')
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
Run Code Online (Sandbox Code Playgroud)
您的问题是“为什么 Spark 迭代输入文件夹,但应用默认write
模式,这在该上下文中没有意义”。
引用Spark V1.6 Python API ...
mode(saveMode)
指定数据或表已存在时的行为。
选项包括:
append 将此 DataFrame 的内容附加到现有数据。
覆盖覆盖现有数据。
error 如果数据已经存在则抛出异常。
忽略 忽略该操作,如果数据已经存在。
我认为outdata.write.mode('append').parquet(outloc)
值得一试。
归档时间: |
|
查看次数: |
5825 次 |
最近记录: |