如何使用 aiobotocore 模拟 AWS S3

Bel*_*fon 4 python aiohttp moto

我有一个项目,它使用 aiohttp 和 aiobotocore 来处理 AWS 中的资源。我正在尝试测试适用于 AWS S3 的类,并且我正在使用 moto 来模拟 AWS。对于使用同步代码的示例(来自 moto 文档的示例),模拟工作得很好

import boto3
from moto import mock_s3


class MyModel(object):
    def __init__(self, name, value):
        self.name = name
        self.value = value

    def save(self):
        s3 = boto3.client('s3', region_name='us-east-1')
        s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)


def test_my_model_save():
    with mock_s3():
        conn = boto3.resource('s3', region_name='us-east-1')
        conn.create_bucket(Bucket='mybucket')

        model_instance = MyModel('steve', 'is awesome')
        model_instance.save()
        body = conn.Object('mybucket', 'steve').get()['Body'].read().decode("utf-8")

        assert body == 'is awesome'
Run Code Online (Sandbox Code Playgroud)

但是,在重写它以使用 aiobotocore 模拟后不起作用 - 在我的示例中它连接到真正的 AWS S3。

import aiobotocore
import asyncio

import boto3
from moto import mock_s3


class MyModel(object):
    def __init__(self, name, value):
        self.name = name
        self.value = value

    async def save(self, loop):
        session = aiobotocore.get_session(loop=loop)
        s3 = session.create_client('s3', region_name='us-east-1')
        await s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)


def test_my_model_save():
    with mock_s3():
        conn = boto3.resource('s3', region_name='us-east-1')
        conn.create_bucket(Bucket='mybucket')
        loop = asyncio.get_event_loop()

        model_instance = MyModel('steve', 'is awesome')
        loop.run_until_complete(model_instance.save(loop=loop))
        body = conn.Object('mybucket', 'steve').get()['Body'].read().decode("utf-8")

        assert body == 'is awesome'
Run Code Online (Sandbox Code Playgroud)

所以我的假设是 moto 不能与 aiobotocore 一起正常工作。如果我的源代码类似于第二个示例,我如何有效地模拟 AWS 资源?

And*_*lov 5

来自的模拟moto不起作用,因为它们使用同步 API。但是你可以启动 moto 服务器并配置 aiobotocore 来连接到这个测试服务器。查看 aiobotocore 测试以获取灵感。