Cas*_*210 4 python subprocess amazon-s3 amazon-web-services
我正在尝试编写一个 python 脚本,它使用 subprocess 模块将文件从一个 s3 存储桶复制到另一个。但是,为了提高性能,我尝试并行运行具有不同前缀的单独同步命令。
到目前为止我所尝试的脚本没有终止,并且我不确定子进程是否同时运行。
import subprocess
prefix = ['prefix1','prefix2','prefix3']
source_bucket = 's3://source'
dest_bucket = 's3://dest'
commands = []
for p in prefix:
command = 'aws s3 sync source_bucket' + p + ' dest_bucket'
commands.append(command)
procs = [subprocess.Popen(i, shell=True, stdout=subprocess.PIPE) for i in commands]
for p in procs:
p.wait()
Run Code Online (Sandbox Code Playgroud)
有更好的方法吗?任何帮助表示赞赏。
因为您要传入subprocess.PIPE
,所以不同的进程将在等待输出时阻塞。您需要运行单独的进程来与每个 aws 实例进行通信。一种可能性是使用 Python 的多处理:
import subprocess
import multiprocessing
def worker(command, queue):
# Don't use shell here, we can avoid the overhead
proc = subprocess.Popen(command, stdout=subprocess.PIPE, bufsize=1, universal_newlines=True)
# Read from the aws command output and send it off the queue as soon
# as it's available
for line in proc.stdout:
queue.put(line.rstrip("\r\n"))
# Notify the listener that we're done
queue.put(None)
def main():
# Move all work to a function so it's multiprocessing safe, see
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
# Note: Adding trailing slash so "source_bucket" + "prefix" is a valid S3 URI
prefixes = ['prefix1/', 'prefix2/', 'prefix3/']
source_bucket = 's3://source/'
dest_bucket = 's3://dest/'
# Create a queue to gather final messages from each worker
queue = multiprocessing.Queue()
procs = []
for p in prefixes:
# Pass in --no-progress so the progress messages aren't shown
# displaying those messages is complicated, and requires quite a bit
# of work to make sure they don't interfer with each other
# Correct the command syntax here to use all the variables
# Need to pass in the prefix to the dest URI as well so the same structure is
# maintained
# Use a argv style call here so we can avoid bringing the shell into this
command = ['aws', 's3', 'sync', '--no-progress', source_bucket + p, dest_bucket + p]
# Hand off the work to a worker to read from the pipe to prevent each
# spawned aws instance from blocking
proc = multiprocessing.Process(target=worker, args=(command, queue))
proc.start()
procs.append(proc)
# Read from the Queue to show the output
left = len(procs)
while left > 0:
msg = queue.get()
if msg is None:
# This means a worker is done
left -= 1
else:
# Just print out the output, doing it in one process to prevent any
# collision possibilities
print(msg)
# Clean up
for proc in procs:
proc.join()
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
5267 次 |
最近记录: |