toy*_*toy 8 python celery python-mock
我正在尝试测试应用程序是否正在重试.
@celery.task(bind=False, default_retry_delay=30)
def convert_video(gif_url, webhook):
// doing something
VideoManager().convert(gif_url)
return
except Exception as exc:
raise convert_video.retry(exc=exc)
Run Code Online (Sandbox Code Playgroud)
我正在嘲笑这个测试
@patch('src.video_manager.VideoManager.convert')
@patch('requests.post')
def test_retry_failed_task(self, mock_video_manager, mock_requests):
mock_video_manager.return_value= {'webm':'file.webm', 'mp4':'file.mp4', 'ogv' : 'file.ogv', 'snapshot':'snapshot.png'}
mock_video_manager.side_effect = Exception('some error')
server.convert_video.retry = MagicMock()
server.convert_video('gif_url', 'http://www.company.com/webhook?attachment_id=1234')
server.convert_video.retry.assert_called_with(ANY)
Run Code Online (Sandbox Code Playgroud)
我收到了这个错误
TypeError:exception必须是旧式类或派生自BaseException,而不是MagicMock
这是显而易见的,但我不知道该怎么做,否则测试是否正在调用该方法.
我还没有使用内置的重试来让它工作,所以我必须使用具有真正重试副作用的模拟,这使得可以在测试中捕获它。我是这样做的:
from celery.exceptions import Retry
from mock import MagicMock
from nose.plugins.attrib import attr
# Set it for for every task-call (or per task below with @patch)
task.retry = MagicMock(side_effect=Retry)
#@patch('task.retry', MagicMock(side_effect=Retry)
def test_task(self):
with assert_raises(Retry):
task() # Note, no delay or things like that
# and the task, I don't know if it works without bind.
@Celery.task(bind=True)
def task(self):
raise self.retry()
Run Code Online (Sandbox Code Playgroud)
如果有人知道我如何摆脱嘲笑重试“异常”的额外步骤,我会很高兴听到它!
| 归档时间: |
|
| 查看次数: |
2778 次 |
| 最近记录: |