dec*_*eze 3 python unit-testing tornado python-3.x python-asyncio
我有一个基于异步的类,我想对其进行单元测试。使用tornado.testing.AsyncTestCase此工具非常容易。但是,我班上的一个特定方法asyncio.ensure_future用来安排另一个方法的执行。这永远不会在中完成AsyncTestCase,因为默认的测试运行器使用龙卷风KQueueIOLoop事件循环,而不是asyncio事件循环。
class TestSubject:
def foo(self):
asyncio.ensure_future(self.bar())
async def bar(self):
pass
Run Code Online (Sandbox Code Playgroud)
class TestSubjectTest(AsyncTestCase):
def test_foo(self):
t = TestSubject()
# here be somewhat involved setup with MagicMock and self.stop
t.foo()
self.wait()
Run Code Online (Sandbox Code Playgroud)
$ python -m tornado.testing baz.testsubject_test
...
[E 160627 17:48:22 testing:731] FAIL
[E 160627 17:48:22 base_events:1090] Task was destroyed but it is pending!
task: <Task pending coro=<TestSubject.bar() running at ...>>
.../asyncio/base_events.py:362: RuntimeWarning: coroutine 'TestSubject.bar' was never awaited
Run Code Online (Sandbox Code Playgroud)
如何使用其他事件循环来运行测试以确保我的任务将被实际执行?或者,如何使我的实现事件独立于循环和交叉兼容?
事实证明很简单...
class TestSubjectTest(AsyncTestCase):
def get_new_ioloop(self): # override this method
return tornado.platform.asyncio.AsyncIOMainLoop()
Run Code Online (Sandbox Code Playgroud)
我以前尝试过此方法,但直接返回asyncio.get_event_loop(),这没有用。返回龙卷风的异步循环包装器就可以了。