如何在 multiprocessing.Pool 中运行的函数中使用模拟

amo*_*fis 7 python unit-testing mocking python-3.x python-multiprocessing

在我的代码中,我使用 multiprocessing.Pool 来同时运行一些代码。简化的代码看起来有点像这样:

    class Wrapper():
        session: Session       


        def __init__(self):
            self.session = requests.Session()
            # Session initialization


        def upload_documents(docs):
            with Pool(4) as pool:
                upload_file = partial(self.upload_document)
                pool.starmap(upload_file, documents)

                summary = create_summary(documents)

            self.upload_document(summary)


        def upload_document(doc):
            self.post(doc)


        def post(data):
            self.session.post(self.url, data, other_params)
Run Code Online (Sandbox Code Playgroud)

所以基本上通过 HTTP 发送文档是并行的。现在我想测试这段代码,但无法做到。这是我的测试:

    @patch.object(Session, 'post')
    def test_study_upload(self, post_mock):
        response_mock = Mock()
        post_mock.return_value = response_mock
        response_mock.ok = True

        with Wrapper() as wrapper:
            wrapper.upload_documents(documents)

        mc = post_mock.mock_calls
Run Code Online (Sandbox Code Playgroud)

在调试中我可以检查模拟调用。有一个看起来有效,它是上传摘要的那个,以及一堆诸如call.json()call.__len__()call.__str__()调用。

没有电话上传文件。当我在方法中设置断点时upload_document,我可以看到每个文档都调用一次它,它按预期工作。但是,我无法测试它,因为我无法通过模拟验证此行为。我认为这是因为有许多进程调用同一个模拟,但仍然 - 我该如何解决这个问题?

我使用Python 3.6

who*_*ski 1

我在这里采取的方法是让您的测试尽可能精细并模拟其他调用。在这种情况下,您需要模拟您的Pool对象并验证它是否正在调用您所期望的内容,而不是实际上依赖它在测试期间启动子进程。这就是我的想法:

@patch('yourmodule.Pool')
def test_study_upload(self, mock_pool_init):
    mock_pool_instance = mock_pool_init.return_value.__enter__.return_value

    with Wrapper() as wrapper:
        wrapper.upload_documents(documents)

    # To get the upload file arg here, you'll need to either mock the partial call here, 
    # or actually call it and get the return value
    mock_pool_instance.starmap.assert_called_once_with_args(upload_file, documents)
Run Code Online (Sandbox Code Playgroud)

然后,您需要采用现有逻辑并单独测试 upload_document 函数:

@patch.object(Session, 'post')
def test_upload_file(self, post_mock):
    response_mock = Mock()
    post_mock.return_value = response_mock
    response_mock.ok = True

    with Wrapper() as wrapper:
        wrapper.upload_document(document)

    mc = post_mock.mock_calls
Run Code Online (Sandbox Code Playgroud)

这使您可以覆盖创建和控制池的函数以及池实例调用的函数。警告这一点,我没有对此进行测试,但我留下了一些内容供您填写空白,因为它看起来像是您原始问题中实际模块的缩写版本。

编辑:

尝试这个:

def test_study_upload(self):
    def call_direct(func_var, documents):
        return func_var(documents)

    with patch('yourmodule.Pool.starmap', new=call_direct)
        with Wrapper() as wrapper:
            wrapper.upload_documents(documents)
Run Code Online (Sandbox Code Playgroud)

这是修补星图调用,以便它直接调用您传入的函数。它完全绕过池;最重要的是,您无法真正深入研究由多处理创建的那些子流程。