Python 3 asyncio 与 aioboto3 似乎是连续的

New*_*biZ 5 python amazon-web-services async-await python-asyncio

我正在将一个简单的 python 3 脚本移植到 AWS Lambda。该脚本很简单:它从十几个 S3 对象收集信息并返回结果。

该脚本用于multiprocessing.Pool并行收集所有文件。但由于缺少,multiprocessing因此无法在 AWS Lambda 环境中使用。/dev/shm所以我想与其写一个脏multiprocessing.Process/multiprocessing.Queue替换,不如尝试asyncio一下。

aioboto3我正在Python 3.8 上使用最新版本(8.0.5)。

我的问题是,我似乎无法在文件的天真顺序下载和异步事件循环复用下载之间获得任何改进。

这是我的代码的两个版本。

import sys
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

import boto3
import aioboto3

BUCKET = 'some-bucket'
KEYS = [
    'some/key/1',
    [...]
    'some/key/10',
]

async def download_aio():
    """Concurrent download of all objects from S3"""
    async with aioboto3.client('s3') as s3:
        objects = [s3.get_object(Bucket=BUCKET, Key=k) for k in KEYS]
        objects = await asyncio.gather(*objects)
        buffers = await asyncio.gather(*[o['Body'].read() for o in objects])

def download():
    """Sequentially download all objects from S3"""
    s3 = boto3.client('s3')
    for key in KEYS:
        object = s3.get_object(Bucket=BUCKET, Key=key)
        object['Body'].read()

def run_sequential():
    download()

def run_concurrent():
    loop = asyncio.get_event_loop()
    #loop.set_default_executor(ProcessPoolExecutor(10))
    #loop.set_default_executor(ThreadPoolExecutor(10))
    loop.run_until_complete(download_aio())
Run Code Online (Sandbox Code Playgroud)

run_sequential()和的时间run_concurrent()非常相似(十几个 10MB 文件大约需要 3 秒)。我确信并发版本不是,原因有多种:

  • 我尝试切换到Process/ThreadPoolExecutor,并且在函数持续时间内生成了进程/线程,尽管它们什么也没做
  • 顺序和并发之间的时序非常接近,尽管我的网络接口绝对没有饱和,并且CPU也没有绑定
  • 并发版本所花费的时间随着文件数量的增加而线性增加。

我确信缺少了一些东西,但我就是无法弄清楚到底缺少什么。

有任何想法吗?

New*_*biZ 6

在花费了几个小时尝试了解如何aioboto3正确使用后,我决定切换到我的备份解决方案。我最终推出了自己的简单版本,multiprocessing.Pool以便在 AWS lambda 环境中使用。

如果将来有人偶然发现这个线程,就在这里。它远非完美,但multiprocessing.Pool对于我的简单案例来说很容易按原样替换。

from multiprocessing import Process, Pipe
from multiprocessing.connection import wait


class Pool:
    """Naive implementation of a process pool with mp.Pool API.

    This is useful since multiprocessing.Pool uses a Queue in /dev/shm, which
    is not mounted in an AWS Lambda environment.
    """

    def __init__(self, process_count=1):
        assert process_count >= 1
        self.process_count = process_count

    @staticmethod
    def wrap_pipe(pipe, index, func):
        def wrapper(args):
            try:
                result = func(args)
            except Exception as exc:  # pylint: disable=broad-except
                result = exc
            pipe.send((index, result))
        return wrapper

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        pass

    def map(self, function, arguments):
        pending = list(enumerate(arguments))
        running = []
        finished = [None] * len(pending)
        while pending or running:
            # Fill the running queue with new jobs
            while len(running) < self.process_count:
                if not pending:
                    break
                index, args = pending.pop(0)
                pipe_parent, pipe_child = Pipe(False)
                process = Process(
                    target=Pool.wrap_pipe(pipe_child, index, function),
                    args=(args, ))
                process.start()
                running.append((index, process, pipe_parent))
            # Wait for jobs to finish
            for pipe in wait(list(map(lambda t: t[2], running))):
                index, result = pipe.recv()
                # Remove the finished job from the running list
                running = list(filter(lambda x: x[0] != index, running))
                # Add the result to the finished list
                finished[index] = result

        return finished
Run Code Online (Sandbox Code Playgroud)