如何检查boto3 S3.Client.upload_fileobj是否成功?

Ago*_*iro 5 python-3.x boto3

我想在S3上保存长时间运行的结果。这项工作是在Python中实现的,因此我正在使用boto3。该用户指南说,使用S3.Client.upload_fileobj用于此目的,其工作正常,但我无法弄清楚如何检查是否上传成功。根据文档,该方法不返回任何内容,也不会引发错误。该Callback参数似乎旨在用于进度跟踪,而不是错误检查。还不清楚方法调用是同步还是异步。

如果由于某种原因上传失败,我想将内容保存到磁盘并记录错误。所以我的问题是:如何检查boto3 S3.Client.upload_fileobj调用是否成功,如果失败,则要进行一些错误处理?

reu*_*ano 14

head_object我使用和的组合wait_until_exists

import boto3

from botocore.exceptions import ClientError, WaiterError

session = boto3.Session()
s3_client = session.client('s3')
s3_resource = session.resource('s3')


def upload_src(src, filename, bucketName):
    success = False

    try:
        bucket = s3_resource.Bucket(bucketName)
    except ClientError as e:
        bucket = None

    try:
        # In case filename already exists, get current etag to check if the 
        # contents change after upload
        head = s3_client.head_object(Bucket=bucketName, Key=filename)
    except ClientError:
        etag = ''
    else:
        etag = head['ETag'].strip('"')

    try:
        s3_obj = bucket.Object(filename)
    except ClientError, AttributeError:
        s3_obj = None

    try:
        s3_obj.upload_fileobj(src)
    except ClientError, AttributeError:
        pass
    else:
        try:
            s3_obj.wait_until_exists(IfNoneMatch=etag)
        except WaiterError as e:
            pass
        else:
            head = s3_client.head_object(Bucket=bucketName, Key=filename)
            success = head['ContentLength']

    return success
Run Code Online (Sandbox Code Playgroud)

  • 使用 Python 3.8,我必须将多个异常转换为元组。例如 except (ClientError, AttributeError):... 我真的很喜欢这里的 ETag 检查。 (2认同)