Ray*_*nda 6 python synchronization amazon-s3 boto3
我注意到 boto3 中没有可通过命令行执行的“同步”操作的 API。
所以,
如何使用 boto3 将本地文件夹同步到给定的存储桶?
我刚刚为这个问题实现了一个简单的类。我把它贴在这里希望它可以帮助任何有同样问题的人。
您可以修改 S3Sync.sync 以将文件大小考虑在内。
class S3Sync:
"""
Class that holds the operations needed for synchronize local dirs to a given bucket.
"""
def __init__(self):
self._s3 = boto3.client('s3')
def sync(self, source: str, dest: str) -> [str]:
"""
Sync source to dest, this means that all elements existing in
source that not exists in dest will be copied to dest.
No element will be deleted.
:param source: Source folder.
:param dest: Destination folder.
:return: None
"""
paths = self.list_source_objects(source_folder=source)
objects = self.list_bucket_objects(dest)
# Getting the keys and ordering to perform binary search
# each time we want to check if any paths is already there.
object_keys = [obj['Key'] for obj in objects]
object_keys.sort()
object_keys_length = len(object_keys)
for path in paths:
# Binary search.
index = bisect_left(object_keys, path)
if index == object_keys_length:
# If path not found in object_keys, it has to be sync-ed.
self._s3.upload_file(str(Path(source).joinpath(path)), Bucket=dest, Key=path)
def list_bucket_objects(self, bucket: str) -> [dict]:
"""
List all objects for the given bucket.
:param bucket: Bucket name.
:return: A [dict] containing the elements in the bucket.
Example of a single object.
{
'Key': 'example/example.txt',
'LastModified': datetime.datetime(2019, 7, 4, 13, 50, 34, 893000, tzinfo=tzutc()),
'ETag': '"b11564415be7f58435013b414a59ae5c"',
'Size': 115280,
'StorageClass': 'STANDARD',
'Owner': {
'DisplayName': 'webfile',
'ID': '75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a'
}
}
"""
try:
contents = self._s3.list_objects(Bucket=bucket)['Contents']
except KeyError:
# No Contents Key, empty bucket.
return []
else:
return contents
@staticmethod
def list_source_objects(source_folder: str) -> [str]:
"""
:param source_folder: Root folder for resources you want to list.
:return: A [str] containing relative names of the files.
Example:
/tmp
- example
- file_1.txt
- some_folder
- file_2.txt
>>> sync.list_source_objects("/tmp/example")
['file_1.txt', 'some_folder/file_2.txt']
"""
path = Path(source_folder)
paths = []
for file_path in path.rglob("*"):
if file_path.is_dir():
continue
str_file_path = str(file_path)
str_file_path = str_file_path.replace(f'{str(path)}/', "")
paths.append(str_file_path)
return paths
if __name__ == '__main__':
sync = S3Sync()
sync.sync("/temp/some_folder", "some_bucket_name")
Run Code Online (Sandbox Code Playgroud)
@Z.Wei 评论:
深入研究一下以处理奇怪的 bisect 函数。我们可以使用 if path not in object_keys:?
我认为这是一个有趣的问题,值得更新答案,不要迷失在评论中。
回答:
不,if path not in object_keys将执行线性搜索O(n)。bisect_* 执行二进制搜索(列表必须排序),其为 O(log(n))。
大多数情况下,您将处理足够多的对象来进行排序和二进制搜索,这通常比仅使用 in 关键字更快。
考虑到您必须使用in O(m * n)检查源中的每条路径与目标中的每条路径,其中 m 是源中的对象数,而 n 是目标中的对象数。使用 bisect 整个事情是O( n * log(n) )
如果我考虑一下,您可以使用集合使算法更快(并且简单,因此更具有 Python 风格):
def sync(self, source: str, dest: str) -> [str]:
# Local paths
paths = set(self.list_source_objects(source_folder=source))
# Getting the keys (remote s3 paths).
objects = self.list_bucket_objects(dest)
object_keys = set([obj['Key'] for obj in objects])
# Compute the set difference: What we have in paths that does
# not exists in object_keys.
to_sync = paths - object_keys
sournce_path = Path(source)
for path in to_sync:
self._s3.upload_file(str(sournce_path / path),
Bucket=dest, Key=path)
Run Code Online (Sandbox Code Playgroud)
搜索在sets为O(1),因此,使用一套整个事情将是O(n)的方式更快,以前的O(M *的log(n)) 。
代码可以改进更多的方法list_bucket_objects并list_source_objects返回集合而不是列表。
| 归档时间: |
|
| 查看次数: |
5588 次 |
| 最近记录: |