Python asyncio任务收益率很低

our*_*ord 13 python asynchronous python-asyncio

我对如何asyncio在Python 3.4中使用该模块感到困惑.我有一个searching搜索引擎的API,并希望每个搜索请求可以并行或异步运行,这样我就不必等待一次搜索完成另一次搜索.

这是我的高级搜索API,用于使用原始搜索结果构建一些对象.搜索引擎本身正在使用某种asyncio机制,所以我不会打扰它.

# No asyncio module used here now
class search(object):
  ...
  self.s = some_search_engine()
  ...
  def searching(self, *args, **kwargs):
    ret = {}
    # do some raw searching according to args and kwargs and build the wrapped results
    ...
    return ret
Run Code Online (Sandbox Code Playgroud)

为了尝试异步请求,我编写了以下测试用例来测试如何将我的东西与asyncio模块进行交互.

# Here is my testing script
@asyncio.coroutine
def handle(f, *args, **kwargs):
  r = yield from f(*args, **kwargs)
  return r

s = search()
loop = asyncio.get_event_loop()
loop.run_until_complete(handle(s.searching, arg1, arg2, ...))
loop.close()
Run Code Online (Sandbox Code Playgroud)

通过运行pytest,RuntimeError: Task got bad yield : {results from searching...}当它到达线时它将返回a r = yield from ....

我也试过另一种方式.

# same handle as above
def handle(..):
  ....
s = search()
loop = asyncio.get_event_loop()
tasks = [
        asyncio.async(handle(s.searching, arg11, arg12, ...)),
        asyncio.async(handle(s.searching, arg21, arg22, ...)),
        ...
        ]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
Run Code Online (Sandbox Code Playgroud)

通过pytest运行这个测试用例,它会通过,但搜索引擎会产生一些奇怪的异常.它说Future/Task exception was never retrieved.

我想问的事情:

  1. 对于我的第一次尝试,yield from通过返回函数调用的实际结果是正确的使用方法吗?
  2. 我想我需要在我的第二个测试用例中添加一些睡眠来等待任务完成,但是我应该怎么做呢?如何在第二个测试用例中调用函数调用?
  3. 通过创建处理请求的异步处理程序,这是使用现有模块实现asyncio的好方法吗?
  4. 如果问题2的答案为否,那么每个客户端调用该类search需要包含loop = get_event_loop()这类内容来同步请求吗?

dan*_*ano 17

问题是你不能只调用现有的同步代码,就像它是一个asyncio.coroutine并获得异步行为一样.当你调用时yield from searching(...),你只会获得异步行为,如果searching它本身实际是一个asyncio.coroutine,或者至少返回一个asyncio.Future.现在,searching它只是一个常规的同步函数,所以调用yield from searching(...)只会抛出一个错误,因为它不返回一个Future或协同程序.

要获得所需的行为searching,除了synchronous版本之外,您还需要具有异步版本(或者如果您不需要,则只需删除同步版本).您有几个选项可以支持两者:

  1. 重写searchingasyncio.coroutine它使用asyncio兼容的调用来执行其I/O,而不是阻止I/O. 这将使其在asyncio上下文中工作,但这意味着您将无法再在同步上下文中直接调用它.相反,您还需要提供另searching一种启动asyncio事件循环和调用的同步方法return loop.run_until_complete(self.searching(...)).有关详细信息,请参阅此问题.
  2. 保持同步实现searching,并提供一个替代的异步API,用于在后台线程中BaseEventLoop.run_in_executor运行您的searching方法:

    class search(object):
      ...
      self.s = some_search_engine()
      ...
      def searching(self, *args, **kwargs):
        ret = {}
        ...
        return ret
    
       @asyncio.coroutine
       def searching_async(self, *args, **kwargs):
          loop = kwargs.get('loop', asyncio.get_event_loop())
          try:
              del kwargs['loop']  # assuming searching doesn't take loop as an arg
          except KeyError:
              pass
          r = yield from loop.run_in_executor(None, self.searching, *args)  # Passing None tells asyncio to use the default ThreadPoolExecutor
          return r
    
    Run Code Online (Sandbox Code Playgroud)

    测试脚本:

    s = search()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(s.searching_async(arg1, arg2, ...))
    loop.close()
    
    Run Code Online (Sandbox Code Playgroud)

    这样,您可以保持同步代码不变,至少提供可以在asyncio代码中使用的方法而不会阻塞事件循环.它不是像你在代码中实际使用异步I/O那样干净的解决方案,但它总比没有好.

  3. 提供两个完全独立的版本searching,一个使用阻塞I/O,另一个使用asyncio兼容的版本.这为两个上下文提供了理想的实现,但需要两倍的工作.