our*_*ord 13 python asynchronous python-asyncio
我对如何asyncio在Python 3.4中使用该模块感到困惑.我有一个searching搜索引擎的API,并希望每个搜索请求可以并行或异步运行,这样我就不必等待一次搜索完成另一次搜索.
这是我的高级搜索API,用于使用原始搜索结果构建一些对象.搜索引擎本身正在使用某种asyncio机制,所以我不会打扰它.
# No asyncio module used here now
class search(object):
...
self.s = some_search_engine()
...
def searching(self, *args, **kwargs):
ret = {}
# do some raw searching according to args and kwargs and build the wrapped results
...
return ret
Run Code Online (Sandbox Code Playgroud)
为了尝试异步请求,我编写了以下测试用例来测试如何将我的东西与asyncio模块进行交互.
# Here is my testing script
@asyncio.coroutine
def handle(f, *args, **kwargs):
r = yield from f(*args, **kwargs)
return r
s = search()
loop = asyncio.get_event_loop()
loop.run_until_complete(handle(s.searching, arg1, arg2, ...))
loop.close()
Run Code Online (Sandbox Code Playgroud)
通过运行pytest,RuntimeError: Task got bad yield : {results from searching...}当它到达线时它将返回a r = yield from ....
我也试过另一种方式.
# same handle as above
def handle(..):
....
s = search()
loop = asyncio.get_event_loop()
tasks = [
asyncio.async(handle(s.searching, arg11, arg12, ...)),
asyncio.async(handle(s.searching, arg21, arg22, ...)),
...
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
Run Code Online (Sandbox Code Playgroud)
通过pytest运行这个测试用例,它会通过,但搜索引擎会产生一些奇怪的异常.它说Future/Task exception was never retrieved.
我想问的事情:
yield from通过返回函数调用的实际结果是正确的使用方法吗?search需要包含loop = get_event_loop()这类内容来同步请求吗?dan*_*ano 17
问题是你不能只调用现有的同步代码,就像它是一个asyncio.coroutine并获得异步行为一样.当你调用时yield from searching(...),你只会获得异步行为,如果searching它本身实际是一个asyncio.coroutine,或者至少返回一个asyncio.Future.现在,searching它只是一个常规的同步函数,所以调用yield from searching(...)只会抛出一个错误,因为它不返回一个Future或协同程序.
要获得所需的行为searching,除了synchronous版本之外,您还需要具有异步版本(或者如果您不需要,则只需删除同步版本).您有几个选项可以支持两者:
searching为asyncio.coroutine它使用asyncio兼容的调用来执行其I/O,而不是阻止I/O. 这将使其在asyncio上下文中工作,但这意味着您将无法再在同步上下文中直接调用它.相反,您还需要提供另searching一种启动asyncio事件循环和调用的同步方法return loop.run_until_complete(self.searching(...)).有关详细信息,请参阅此问题.保持同步实现searching,并提供一个替代的异步API,用于在后台线程中BaseEventLoop.run_in_executor运行您的searching方法:
class search(object):
...
self.s = some_search_engine()
...
def searching(self, *args, **kwargs):
ret = {}
...
return ret
@asyncio.coroutine
def searching_async(self, *args, **kwargs):
loop = kwargs.get('loop', asyncio.get_event_loop())
try:
del kwargs['loop'] # assuming searching doesn't take loop as an arg
except KeyError:
pass
r = yield from loop.run_in_executor(None, self.searching, *args) # Passing None tells asyncio to use the default ThreadPoolExecutor
return r
Run Code Online (Sandbox Code Playgroud)
测试脚本:
s = search()
loop = asyncio.get_event_loop()
loop.run_until_complete(s.searching_async(arg1, arg2, ...))
loop.close()
Run Code Online (Sandbox Code Playgroud)
这样,您可以保持同步代码不变,至少提供可以在asyncio代码中使用的方法而不会阻塞事件循环.它不是像你在代码中实际使用异步I/O那样干净的解决方案,但它总比没有好.
searching,一个使用阻塞I/O,另一个使用asyncio兼容的版本.这为两个上下文提供了理想的实现,但需要两倍的工作.