正确模拟另一个芹菜任务中正在调用的芹菜任务

sca*_*one 3 python mocking python-2.7

如何正确模拟在另一个芹菜任务中调用的芹菜任务?(下面的虚拟代码)

@app.task
def task1(smthg):
    do_so_basic_stuff_1
    do_so_basic_stuff_2
    other_thing(smthg)

@app.task
def task2(smthg):
    if condition:
        task1.delay(smthg[1])
    else:
        task1.delay(smthg)
Run Code Online (Sandbox Code Playgroud)

我在my_module中确实具有完全相同的代码结构。proj / cel / my_module.py我正在尝试在proj / tests / cel_test / test.py中编写测试

测试功能:

def test_this_thing(self):
    # firs I want to mock task1
    # i've tried to import it from my_module.py to test.py and then mock it from test.py namespace 
    # i've tried to import it from my_module.py and mock it
    # nothing worked for me

    # what I basically want to do 
    # mock task1 here
    # and then run task 2 (synchronous)
    task2.apply()
    # and then I want to check if task one was called 
    self.assertTrue(mocked_task1.called)
Run Code Online (Sandbox Code Playgroud)

kch*_*ski 6

您不是在调用task1()task2(),而是在调用它们的方法:delay()apply()-因此您需要测试是否调用了这些方法。

这是我根据您的代码编写的一个有效示例:

task.py

from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def task1():
    return 'task1'

@app.task
def task2():
    task1.delay()
Run Code Online (Sandbox Code Playgroud)

test.py

from tasks import task2

def test_task2(mocker):
    mocked_task1 = mocker.patch('tasks.task1')
    task2.apply()
    assert mocked_task1.delay.called
Run Code Online (Sandbox Code Playgroud)

检测结果:

$ pytest -vvv test.py
============================= test session starts ==============================
platform linux -- Python 3.5.2, pytest-3.2.1, py-1.4.34, pluggy-0.4.0 -- /home/kris/.virtualenvs/3/bin/python3
cachedir: .cache
rootdir: /home/kris/projects/tmp, inifile:
plugins: mock-1.6.2, celery-4.1.0
collected 1 item                                                                

test.py::test_task2 PASSED

=========================== 1 passed in 0.02 seconds ===========================
Run Code Online (Sandbox Code Playgroud)