如何在Python中测试Celery应用程序中的Retry?

toy*_*toy 8 python celery python-mock

我正在尝试测试应用程序是否正在重试.

@celery.task(bind=False, default_retry_delay=30)
def convert_video(gif_url, webhook):
    // doing something
    VideoManager().convert(gif_url)
    return
    except Exception as exc:
         raise convert_video.retry(exc=exc)
Run Code Online (Sandbox Code Playgroud)

我正在嘲笑这个测试

@patch('src.video_manager.VideoManager.convert')
@patch('requests.post')
def test_retry_failed_task(self, mock_video_manager, mock_requests):
    mock_video_manager.return_value= {'webm':'file.webm', 'mp4':'file.mp4', 'ogv' : 'file.ogv', 'snapshot':'snapshot.png'}
    mock_video_manager.side_effect = Exception('some error')

    server.convert_video.retry = MagicMock()

    server.convert_video('gif_url', 'http://www.company.com/webhook?attachment_id=1234')
    server.convert_video.retry.assert_called_with(ANY)
Run Code Online (Sandbox Code Playgroud)

我收到了这个错误

TypeError:exception必须是旧式类或派生自BaseException,而不是MagicMock

这是显而易见的,但我不知道该怎么做,否则测试是否正在调用该方法.

moo*_*odh 9

我还没有使用内置的重试来让它工作,所以我必须使用具有真正重试副作用的模拟,这使得可以在测试中捕获它。我是这样做的:

from celery.exceptions import Retry
from mock import MagicMock
from nose.plugins.attrib import attr

# Set it for for every task-call (or per task below with @patch)
task.retry = MagicMock(side_effect=Retry)

#@patch('task.retry', MagicMock(side_effect=Retry)
def test_task(self):
    with assert_raises(Retry):
        task() # Note, no delay or things like that

# and the task, I don't know if it works without bind.
@Celery.task(bind=True)
def task(self):
   raise self.retry()
Run Code Online (Sandbox Code Playgroud)

如果有人知道我如何摆脱嘲笑重试“异常”的额外步骤,我会很高兴听到它!