iAm*_*ric 8 python testing integration-testing azure-iot-hub azure-iot-edge
我有一个要测试的 python 项目。我已经有了单元测试,unittest但我需要做集成测试。
为此,我有两个应用程序:我必须测试的真实应用程序和一个“测试”应用程序,它将向第一个应用程序发送请求,等待响应,然后将其与预期结果进行比较:
这样我就可以测试应用程序是否正确响应请求。
目前,我有我上面描述的内容,但在 main.py(不是特定的测试文件)中。另外,比较只是用打印功能完成的,所以我可以看到它的工作原理。但是我必须执行这些测试并能够以常规格式(例如 junit xml)获得结果。
我如何编写、运行和获取这些测试的结果?
编辑
我正在开发 Azure IoT Edge 模块,并且正在使用 Route 来连接模块。这是测试模块的代码,我需要在其中执行测试:
import random
import time
import sys
import iothub_client
import json
# pylint: disable=E0611
from iothub_client import IoTHubModuleClient, IoTHubClientError, IoTHubTransportProvider
from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError
# Callback received when the message that we're forwarding is processed.
def send_confirmation_callback(message, result, user_context):
print ( "Confirmation[%d] received for message with result = %s" % (user_context, result) )
# receive_message_callback is invoked when an incoming message arrives on INPUT queue
def receive_message_callback(message, hubManager):
message_buffer = message.get_bytearray()
size = len(message_buffer)
message_text = message_buffer[:size].decode('utf-8')
data = json.loads(message_text)
result = data["result"]
print ("expected_result: %d; result: %d ==> %r" %(EXPECTED_RESULT, result, EXPECTED_RESULT==result))
class HubManager(object):
def __init__(self, protocol=IoTHubTransportProvider.MQTT):
self.client_protocol = protocol
self.client = IoTHubModuleClient()
self.client.create_from_environment(protocol)
self.client.set_option("messageTimeout", MESSAGE_TIMEOUT)
# sets the callback when a message arrives on INPUT queue.
self.client.set_message_callback(INPUT, receive_message_callback, self)
# Forwards the message received onto the next stage in the process.
def forward_event_to_output(self, outputQueueName, event, send_context):
self.client.send_event_async(
outputQueueName, event, send_confirmation_callback, send_context)
def main(protocol):
try:
hub_manager = HubManager(protocol)
# Send request
message = "{\"param1\": %d,\"param2\": %d}" % (PARAM_1, PARAM_2)
msg_txt_formatted = IoTHubMessage(message)
hub_manager.forward_event_to_output(OUTPUT, msg_txt_formatted, 0)
while True:
time.sleep(1)
except IoTHubError as iothub_error:
print ( "Unexpected error %s from IoTHub" % iothub_error )
return
except KeyboardInterrupt:
print ( "IoTHubModuleClient sample stopped" )
if __name__ == '__main__':
main(PROTOCOL)
Run Code Online (Sandbox Code Playgroud)
小智 6
这在一定程度上取决于您正在测试的应用程序。假设您的应用程序是一个 API,那么集成测试可以简单地调用不同的端点并比较结果。
您可以使用适当的 write_result 函数创建一个 IntegrationTest 类。
所以一个函数可以是:
def test_get_users_status(self):
expected_result = { "user1": "active", "user2": "inactive" }
r = requests.get('https://localhost:8080/get_users_status', auth=('user', 'pass'))
assert r.status_code is 200
self.write_json(expected_result, r.json())
Run Code Online (Sandbox Code Playgroud)
您可以在集成类中拥有一个函数来运行所有测试、所有与身份验证相关的测试等。
| 归档时间: |
|
| 查看次数: |
11864 次 |
| 最近记录: |