python中的集成测试

iAm*_*ric 8 python testing integration-testing azure-iot-hub azure-iot-edge

我有一个要测试的 python 项目。我已经有了单元测试,unittest但我需要做集成测试。

为此,我有两个应用程序:我必须测试的真实应用程序和一个“测试”应用程序,它将向第一个应用程序发送请求,等待响应,然后将其与预期结果进行比较:

在此处输入图片说明

这样我就可以测试应用程序是否正确响应请求。

目前,我有我上面描述的内容,但在 main.py(不是特定的测试文件)中。另外,比较只是用打印功能完成的,所以我可以看到它的工作原理。但是我必须执行这些测试并能够以常规格式(例如 junit xml)获得结果。

我如何编写、运行和获取这些测试的结果?

编辑

我正在开发 Azure IoT Edge 模块,并且正在使用 Route 来连接模块。这是测试模块的代码,我需要在其中执行测试:

import random
import time
import sys
import iothub_client
import json

# pylint: disable=E0611
from iothub_client import IoTHubModuleClient, IoTHubClientError, IoTHubTransportProvider
from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError

# Callback received when the message that we're forwarding is processed.
def send_confirmation_callback(message, result, user_context):
    print ( "Confirmation[%d] received for message with result = %s" % (user_context, result) )

# receive_message_callback is invoked when an incoming message arrives on INPUT queue
def receive_message_callback(message, hubManager):
    message_buffer = message.get_bytearray()
    size = len(message_buffer)
    message_text = message_buffer[:size].decode('utf-8')
    data = json.loads(message_text)
    result = data["result"]
    print ("expected_result: %d; result: %d ==> %r" %(EXPECTED_RESULT, result, EXPECTED_RESULT==result))

class HubManager(object):
    def __init__(self, protocol=IoTHubTransportProvider.MQTT):
        self.client_protocol = protocol
        self.client = IoTHubModuleClient()
        self.client.create_from_environment(protocol)
        self.client.set_option("messageTimeout", MESSAGE_TIMEOUT)
        # sets the callback when a message arrives on INPUT queue.
        self.client.set_message_callback(INPUT, receive_message_callback, self)

    # Forwards the message received onto the next stage in the process.
    def forward_event_to_output(self, outputQueueName, event, send_context):
        self.client.send_event_async(
            outputQueueName, event, send_confirmation_callback, send_context)

def main(protocol):
    try:
        hub_manager = HubManager(protocol)

        # Send request 
        message = "{\"param1\": %d,\"param2\": %d}" % (PARAM_1, PARAM_2)
        msg_txt_formatted = IoTHubMessage(message)
        hub_manager.forward_event_to_output(OUTPUT, msg_txt_formatted, 0)

        while True:
            time.sleep(1)

    except IoTHubError as iothub_error:
        print ( "Unexpected error %s from IoTHub" % iothub_error )
        return
    except KeyboardInterrupt:
        print ( "IoTHubModuleClient sample stopped" )

if __name__ == '__main__':
    main(PROTOCOL)
Run Code Online (Sandbox Code Playgroud)

小智 6

这在一定程度上取决于您正在测试的应用程序。假设您的应用程序是一个 API,那么集成测试可以简单地调用不同的端点并比较结果。

您可以使用适当的 write_result 函数创建一个 IntegrationTest 类。

所以一个函数可以是:

def test_get_users_status(self):
    expected_result = { "user1": "active",  "user2": "inactive" }
    r = requests.get('https://localhost:8080/get_users_status', auth=('user', 'pass'))
    assert r.status_code is 200
    self.write_json(expected_result, r.json())
Run Code Online (Sandbox Code Playgroud)

您可以在集成类中拥有一个函数来运行所有测试、所有与身份验证相关的测试等。