Pra*_*gam 125 python amazon-s3 boto3
我想知道boto3中是否存在密钥.我可以循环存储桶内容并检查密钥是否匹配.
但这似乎更长,而且有点矫枉过正.Boto3官方文档明确说明了如何做到这一点.
可能是我错过了显而易见的事.任何人都可以指出我如何实现这一目标.
Wan*_*uta 153
Boto 2的boto.s3.key.Key对象曾经有一个exists方法,通过执行HEAD请求并查看结果来检查S3上是否存在密钥,但似乎不再存在.你必须自己做:
import boto3
import botocore
s3 = boto3.resource('s3')
try:
s3.Object('my-bucket', 'dootdoot.jpg').load()
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
# The object does not exist.
...
else:
# Something else has gone wrong.
raise
else:
# The object does exist.
...
Run Code Online (Sandbox Code Playgroud)
load() 对单个键执行HEAD请求,即使相关对象很大或者您的存储桶中有很多对象,它也很快.
当然,您可能正在检查对象是否存在,因为您计划使用它.如果是这种情况,您可以忘记load()并执行get()或download_file()直接执行,然后在那里处理错误情况.
Evi*_*ter 96
我不是使用控制流异常的忠实粉丝.这是另一种适用于boto3的方法:
import boto3
s3 = boto3.resource('s3')
bucket = s3.Bucket('my-bucket')
key = 'dootdoot.jpg'
objs = list(bucket.objects.filter(Prefix=key))
if len(objs) > 0 and objs[0].key == key:
print("Exists!")
else:
print("Doesn't exist")
Run Code Online (Sandbox Code Playgroud)
o_c*_*o_c 85
我找到的最简单的方法(也许最有效)是:
import boto3
from botocore.errorfactory import ClientError
s3 = boto3.client('s3')
try:
s3.head_object(Bucket='bucket_name', Key='file_path')
except ClientError:
# Not found
pass
Run Code Online (Sandbox Code Playgroud)
Luc*_*orr 17
在Boto3中,如果您使用list_objects检查文件夹(前缀)或文件.您可以使用响应dict中"Contents"的存在来检查对象是否存在.这是另一种避免try/except捕获的方法,如@EvilPuppetMaster建议的那样
import boto3
client = boto3.client('s3')
results = client.list_objects(Bucket='my-bucket', Prefix='dootdoot.jpg')
return 'Contents' in results
Run Code Online (Sandbox Code Playgroud)
Vin*_*ceP 17
您可以使用S3Fs,它本质上是 boto3 的包装器,用于公开典型的文件系统样式操作:
import s3fs
s3 = s3fs.S3FileSystem()
s3.exists('myfile.txt')
Run Code Online (Sandbox Code Playgroud)
Ash*_*uGG 14
您可以使用 Boto3 来实现此目的。
import boto3
s3 = boto3.resource('s3')
bucket = s3.Bucket('my-bucket')
objs = list(bucket.objects.filter(Prefix=key))
if(len(objs)>0):
print("key exists!!")
else:
print("key doesn't exist!")
Run Code Online (Sandbox Code Playgroud)
这里的key是你要检查的路径是否存在
mar*_*vls 13
假设您只想检查密钥是否存在(而不是悄悄地覆盖它),请先执行此检查:
import boto3
def key_exists(mykey, mybucket):
s3_client = boto3.client('s3')
response = s3_client.list_objects_v2(Bucket=mybucket, Prefix=mykey)
if response:
for obj in response['Contents']:
if mykey == obj['Key']:
return True
return False
if key_exists('someprefix/myfile-abc123', 'my-bucket-name'):
print("key exists")
else:
print("safe to put new bucket object")
# try:
# resp = s3_client.put_object(Body="Your string or file-like object",
# Bucket=mybucket,Key=mykey)
# ...check resp success and ClientError exception for errors...
Run Code Online (Sandbox Code Playgroud)
Vit*_*ich 10
不仅client但是bucket太:
import boto3
import botocore
bucket = boto3.resource('s3', region_name='eu-west-1').Bucket('my-bucket')
try:
bucket.Object('my-file').get()
except botocore.exceptions.ClientError as ex:
if ex.response['Error']['Code'] == 'NoSuchKey':
print('NoSuchKey')
Run Code Online (Sandbox Code Playgroud)
这可以检查前缀和键,并且最多获取 1 个键。
def prefix_exits(bucket, prefix):
s3_client = boto3.client('s3')
res = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=1)
return 'Contents' in res
Run Code Online (Sandbox Code Playgroud)
import boto3
client = boto3.client('s3')
s3_key = 'Your file without bucket name e.g. abc/bcd.txt'
bucket = 'your bucket name'
content = client.head_object(Bucket=bucket,Key=s3_key)
if content.get('ResponseMetadata',None) is not None:
print "File exists - s3://%s/%s " %(bucket,s3_key)
else:
print "File does not exist - s3://%s/%s " %(bucket,s3_key)
Run Code Online (Sandbox Code Playgroud)
FWIW,这是我正在使用的非常简单的功能
import boto3
def get_resource(config: dict={}):
"""Loads the s3 resource.
Expects AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to be in the environment
or in a config dictionary.
Looks in the environment first."""
s3 = boto3.resource('s3',
aws_access_key_id=os.environ.get(
"AWS_ACCESS_KEY_ID", config.get("AWS_ACCESS_KEY_ID")),
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", config.get("AWS_SECRET_ACCESS_KEY")))
return s3
def get_bucket(s3, s3_uri: str):
"""Get the bucket from the resource.
A thin wrapper, use with caution.
Example usage:
>> bucket = get_bucket(get_resource(), s3_uri_prod)"""
return s3.Bucket(s3_uri)
def isfile_s3(bucket, key: str) -> bool:
"""Returns T/F whether the file exists."""
objs = list(bucket.objects.filter(Prefix=key))
return len(objs) == 1 and objs[0].key == key
def isdir_s3(bucket, key: str) -> bool:
"""Returns T/F whether the directory exists."""
objs = list(bucket.objects.filter(Prefix=key))
return len(objs) > 1
Run Code Online (Sandbox Code Playgroud)
使用objects.filter和检查结果列表是迄今为止检查 S3 存储桶中是否存在文件的最快方法。.
使用这个简洁的 oneliner,当您必须将它扔到现有项目中而不修改大部分代码时,它会减少干扰。
s3_file_exists = lambda filename: bool(list(bucket.objects.filter(Prefix=filename)))
Run Code Online (Sandbox Code Playgroud)
上述函数假定bucket变量已经声明。
您可以扩展 lambda 以支持其他参数,例如
s3_file_exists = lambda filename, bucket: bool(list(bucket.objects.filter(Prefix=filename)))
Run Code Online (Sandbox Code Playgroud)
现在是 2023 年了,以上都对我不起作用。这是为我做的版本:
import boto3
import botocore
s3 = boto3.client('s3')
try:
s3.head_object(Bucket='YOUR_BUCKET_NAME', Key=object_name)
except botocore.exceptions.ClientError as error:
if error.response['Error']['Code']:
print("Object does not exist!")
else:
print("Object exists!")
Run Code Online (Sandbox Code Playgroud)
如果您确实拥有权限并且请求一切正常,那么如果该对象不存在,您将收到 404。该请求还会返回 400 和 403,因此如果您想更具体地了解错误处理,您可以检查这些。
| 归档时间: |
|
| 查看次数: |
108654 次 |
| 最近记录: |