sam*_*sam 8 python unit-testing mocking amazon-s3 boto3
我正在对一个函数进行单元测试,该函数将元素从 S3 对象转换为 pandas DataFrame,并且需要模拟从 boto3 返回的 StreamingBody 对象
文件.py
def object_to_df(self, key_name, dtypes):
s3_object = self.get_object(key_name=key_name)
if s3_object is not None:
object_df = pandas.read_csv(
io.BytesIO(s3_object["Body"].read()), dtype=dtypes
)
return object_df
Run Code Online (Sandbox Code Playgroud)
self.get_object(key_name) 的响应记录在此处
{
'Body': StreamingBody(),
'DeleteMarker': True|False,
'AcceptRanges': 'string',
...
}
Run Code Online (Sandbox Code Playgroud)
所以我需要模拟 StreamingBody() 对象并让我的模拟函数返回该对象。
测试.py
{
'Body': StreamingBody(),
'DeleteMarker': True|False,
'AcceptRanges': 'string',
...
}
Run Code Online (Sandbox Code Playgroud)
但我遇到了TypeError: 'StreamingBody' object is not subscriptable
有什么提示吗?
S3 客户端返回一个字典,并且您模拟的 S3 客户端返回一个 StreamingBody。你模拟的 S3 客户端应该返回类似的内容
body_json = {
'Body': [
{'Candidate': 'Black Panther', 'Votes': 3},
{'Candidate': 'Captain America: Civil War', 'Votes': 8},
{'Candidate': 'Guardians of the Galaxy', 'Votes': 8},
{'Candidate': "Thor: Ragnarok", 'Votes': 1},
]
}
body_encoded = json.dumps(body_json).encode('utf-8')
body = StreamingBody(
StringIO(body_encoded),
len(body_encoded)
)
mocked_response = {
'Body': body,
...
}
mock_get_object.return_value = mocked_response
Run Code Online (Sandbox Code Playgroud)
小智 5
下面的代码对我有用。参考答案:/sf/answers/4524970341/
import json
from botocore.response import StreamingBody
import io
body_json = {
'Body': [
{'Candidate': 'Black Panther', 'Votes': 3},
{'Candidate': 'Captain America: Civil War', 'Votes': 8},
{'Candidate': 'Guardians of the Galaxy', 'Votes': 8},
{'Candidate': "Thor: Ragnarok", 'Votes': 1}
]
}
body_encoded = json.dumps(body_json).encode()
body = StreamingBody(
io.BytesIO(body_encoded),
len(body_encoded)
)
mocked_response = {
'Body': body,
...
}
mock_get_object.return_value = mocked_response
Run Code Online (Sandbox Code Playgroud)