Bel*_*fon 4 python aiohttp moto
我有一个项目,它使用 aiohttp 和 aiobotocore 来处理 AWS 中的资源。我正在尝试测试适用于 AWS S3 的类,并且我正在使用 moto 来模拟 AWS。对于使用同步代码的示例(来自 moto 文档的示例),模拟工作得很好
import boto3
from moto import mock_s3
class MyModel(object):
def __init__(self, name, value):
self.name = name
self.value = value
def save(self):
s3 = boto3.client('s3', region_name='us-east-1')
s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)
def test_my_model_save():
with mock_s3():
conn = boto3.resource('s3', region_name='us-east-1')
conn.create_bucket(Bucket='mybucket')
model_instance = MyModel('steve', 'is awesome')
model_instance.save()
body = conn.Object('mybucket', 'steve').get()['Body'].read().decode("utf-8")
assert body == 'is awesome'
Run Code Online (Sandbox Code Playgroud)
但是,在重写它以使用 aiobotocore 模拟后不起作用 - 在我的示例中它连接到真正的 AWS S3。
import aiobotocore
import asyncio
import boto3
from moto import mock_s3
class MyModel(object):
def __init__(self, name, value):
self.name = name
self.value = value
async def save(self, loop):
session = aiobotocore.get_session(loop=loop)
s3 = session.create_client('s3', region_name='us-east-1')
await s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)
def test_my_model_save():
with mock_s3():
conn = boto3.resource('s3', region_name='us-east-1')
conn.create_bucket(Bucket='mybucket')
loop = asyncio.get_event_loop()
model_instance = MyModel('steve', 'is awesome')
loop.run_until_complete(model_instance.save(loop=loop))
body = conn.Object('mybucket', 'steve').get()['Body'].read().decode("utf-8")
assert body == 'is awesome'
Run Code Online (Sandbox Code Playgroud)
所以我的假设是 moto 不能与 aiobotocore 一起正常工作。如果我的源代码类似于第二个示例,我如何有效地模拟 AWS 资源?
来自的模拟moto不起作用,因为它们使用同步 API。但是你可以启动 moto 服务器并配置 aiobotocore 来连接到这个测试服务器。查看 aiobotocore 测试以获取灵感。
| 归档时间: |
|
| 查看次数: |
2924 次 |
| 最近记录: |