小编Pau*_*ang的帖子

Spark写入S3 V4 SignatureDoesNotMatch错误

SignatureDoesNotMatch在尝试使用Spark将Dataframe写入S3 时遇到了S3 .

症状/事情尝试过:

  • 代码有时失败但有时起作用;
  • 代码可以从S3 中读取而没有任何问题,并且能够不时地写入S3,这排除了错误的配置设置,如S3A/enableV4 /错误的键/区域端点等.
  • S3A端点是根据S3 docs S3 Endpoint设置的 ;
  • 确保AWS_SECRETY_KEY不包含任何非字母数字,如此处所示 ;
  • 使用NTP确保服务器时间同步;
  • 以下是对EC2测试m3.xlargespark-2.0.2-bin-hadoop2.7在本地模式下运行;
  • 当文件写入本地fs时,问题就消失了;
  • 现在解决方法是使用s3fs挂载存储桶并写入其中; 然而这并不理想,因为s3fs经常会因为Spark的压力而死亡;

代码可以归结为:

spark-submit\
    --verbose\
    --conf spark.hadoop.fs.s3n.impl=org.apache.hadoop.fs.s3native.NativeS3FileSystem \
    --conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3.S3FileSystem \
    --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem\
    --packages org.apache.hadoop:hadoop-aws:2.7.3\
    --driver-java-options '-Dcom.amazonaws.services.s3.enableV4'\
    foobar.py


# foobar.py
sc = SparkContext.getOrCreate()
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", 'xxx')
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", 'xxx')
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", 's3.dualstack.ap-southeast-2.amazonaws.com')

hc = SparkSession.builder.enableHiveSupport().getOrCreate()
dataframe = hc.read.parquet(in_file_path)

dataframe.write.csv(
    path=out_file_path,
    mode='overwrite',
    compression='gzip',
    sep=',',
    quote='"',
    escape='\\',
    escapeQuotes='true',
) …
Run Code Online (Sandbox Code Playgroud)

amazon-s3 amazon-web-services apache-spark

10
推荐指数
2
解决办法
2727
查看次数

Catch异常获取UnboundLocalError

我写了一个爬虫来从Q&A网站上获取信息.由于并非所有字段都始终显示在页面中,因此我使用了多个try-excepts来处理这种情况.

def answerContentExtractor( loginSession, questionLinkQueue , answerContentList) :
    while True:
        URL = questionLinkQueue.get()
        try:
            response   = loginSession.get(URL,timeout = MAX_WAIT_TIME)
            raw_data   = response.text

            #These fields must exist, or something went wrong...
            questionId = re.findall(REGEX,raw_data)[0]
            answerId   = re.findall(REGEX,raw_data)[0]
            title      = re.findall(REGEX,raw_data)[0]

        except requests.exceptions.Timeout ,IndexError:
            print >> sys.stderr, URL + " extraction error..."
            questionLinkQueue.task_done()
            continue

        try:
            questionInfo = re.findall(REGEX,raw_data)[0]
        except IndexError:
            questionInfo = ""

        try:
            answerContent = re.findall(REGEX,raw_data)[0]
        except IndexError:
            answerContent = ""

        result = {
                  'questionId'   : questionId,
                  'answerId'     : answerId,
                  'title' …
Run Code Online (Sandbox Code Playgroud)

python exception

6
推荐指数
1
解决办法
4190
查看次数

是否有erlang null语句

erlang是否在python或';'中有'pass'之类的空语句 在C?
有时我想测试代码而不用杀死所有进程,清理ets表并从头开始.

try
    ets:new(TableName,[options])
catch
    % if the ets table has been initialized in the earlier test.
    error:badarg->
        % I want an empty statement instead of an ugly io:format
        io:format("")
Run Code Online (Sandbox Code Playgroud)

erlang null

3
推荐指数
1
解决办法
3774
查看次数