New*_*biZ 5 python amazon-web-services async-await python-asyncio
我正在将一个简单的 python 3 脚本移植到 AWS Lambda。该脚本很简单:它从十几个 S3 对象收集信息并返回结果。
该脚本用于multiprocessing.Pool并行收集所有文件。但由于缺少,multiprocessing因此无法在 AWS Lambda 环境中使用。/dev/shm所以我想与其写一个脏multiprocessing.Process/multiprocessing.Queue替换,不如尝试asyncio一下。
aioboto3我正在Python 3.8 上使用最新版本(8.0.5)。
我的问题是,我似乎无法在文件的天真顺序下载和异步事件循环复用下载之间获得任何改进。
这是我的代码的两个版本。
import sys
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import boto3
import aioboto3
BUCKET = 'some-bucket'
KEYS = [
'some/key/1',
[...]
'some/key/10',
]
async def download_aio():
"""Concurrent download of all objects from S3"""
async with aioboto3.client('s3') as s3:
objects = [s3.get_object(Bucket=BUCKET, Key=k) for k in KEYS]
objects = await asyncio.gather(*objects)
buffers = await asyncio.gather(*[o['Body'].read() for o in objects])
def download():
"""Sequentially download all objects from S3"""
s3 = boto3.client('s3')
for key in KEYS:
object = s3.get_object(Bucket=BUCKET, Key=key)
object['Body'].read()
def run_sequential():
download()
def run_concurrent():
loop = asyncio.get_event_loop()
#loop.set_default_executor(ProcessPoolExecutor(10))
#loop.set_default_executor(ThreadPoolExecutor(10))
loop.run_until_complete(download_aio())
Run Code Online (Sandbox Code Playgroud)
run_sequential()和的时间run_concurrent()非常相似(十几个 10MB 文件大约需要 3 秒)。我确信并发版本不是,原因有多种:
Process/ThreadPoolExecutor,并且在函数持续时间内生成了进程/线程,尽管它们什么也没做我确信缺少了一些东西,但我就是无法弄清楚到底缺少什么。
有任何想法吗?
在花费了几个小时尝试了解如何aioboto3正确使用后,我决定切换到我的备份解决方案。我最终推出了自己的简单版本,multiprocessing.Pool以便在 AWS lambda 环境中使用。
如果将来有人偶然发现这个线程,就在这里。它远非完美,但multiprocessing.Pool对于我的简单案例来说很容易按原样替换。
from multiprocessing import Process, Pipe
from multiprocessing.connection import wait
class Pool:
"""Naive implementation of a process pool with mp.Pool API.
This is useful since multiprocessing.Pool uses a Queue in /dev/shm, which
is not mounted in an AWS Lambda environment.
"""
def __init__(self, process_count=1):
assert process_count >= 1
self.process_count = process_count
@staticmethod
def wrap_pipe(pipe, index, func):
def wrapper(args):
try:
result = func(args)
except Exception as exc: # pylint: disable=broad-except
result = exc
pipe.send((index, result))
return wrapper
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
pass
def map(self, function, arguments):
pending = list(enumerate(arguments))
running = []
finished = [None] * len(pending)
while pending or running:
# Fill the running queue with new jobs
while len(running) < self.process_count:
if not pending:
break
index, args = pending.pop(0)
pipe_parent, pipe_child = Pipe(False)
process = Process(
target=Pool.wrap_pipe(pipe_child, index, function),
args=(args, ))
process.start()
running.append((index, process, pipe_parent))
# Wait for jobs to finish
for pipe in wait(list(map(lambda t: t[2], running))):
index, result = pipe.recv()
# Remove the finished job from the running list
running = list(filter(lambda x: x[0] != index, running))
# Add the result to the finished list
finished[index] = result
return finished
Run Code Online (Sandbox Code Playgroud)