RuntimeError:async + apscheduler中的线程中没有当前事件循环

Val*_*lev 24 python python-3.x apscheduler python-asyncio aiohttp

我有一个异步功能,需要每隔N分钟使用apscheduller运行一次.下面有一个python代码

URL_LIST = ['<url1>',
            '<url2>',
            '<url2>',
            ]

def demo_async(urls):
    """Fetch list of web pages asynchronously."""
    loop = asyncio.get_event_loop() # event loop
    future = asyncio.ensure_future(fetch_all(urls)) # tasks to do
    loop.run_until_complete(future) # loop until done

async def fetch_all(urls):
    tasks = [] # dictionary of start times for each url
    async with ClientSession() as session:
        for url in urls:
            task = asyncio.ensure_future(fetch(url, session))
            tasks.append(task) # create list of tasks
        _ = await asyncio.gather(*tasks) # gather task responses

async def fetch(url, session):
    """Fetch a url, using specified ClientSession."""
    async with session.get(url) as response:
        resp = await response.read()
        print(resp)

if __name__ == '__main__':
    scheduler = AsyncIOScheduler()
    scheduler.add_job(demo_async, args=[URL_LIST], trigger='interval', seconds=15)
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    # Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
    try:
        asyncio.get_event_loop().run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass
Run Code Online (Sandbox Code Playgroud)

但是当我试图运行它时,我有下一个错误信息

Job "demo_async (trigger: interval[0:00:15], next run at: 2017-10-12 18:21:12 +04)" raised an exception.....
..........\lib\asyncio\events.py", line 584, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread '<concurrent.futures.thread.ThreadPoolExecutor object at 0x0356B150>_0'.
Run Code Online (Sandbox Code Playgroud)

你能帮帮我吗?Python 3.6,APScheduler 3.3.1,

小智 56

在你的def demo_async(urls),尝试替换:

loop = asyncio.get_event_loop()
Run Code Online (Sandbox Code Playgroud)

有:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
Run Code Online (Sandbox Code Playgroud)

  • 你好@AlexGrönholm,你能解释一下为什么这很糟糕吗?这将具有巨大的价值。 (8认同)
  • 有时你想要第二个事件循环.有时候你没有 (6认同)
  • 不要这样做!这只会导致第二个事件循环运行。鉴于APScheduler对协程函数的本机支持,这是完全没有意义的。如果您尝试使在一个事件循环中运行的事物与第一个事件循环交互,将会发生不好的事情。 (4认同)
  • @AlexGrönholm,您能否详细说明您的第二条评论,您甚至可以将我重定向到任何博客或文档。我需要在生成的新进程中使用“get_event_loop”。但我收到这个错误。上面的答案可以解决我的问题。 (4认同)
  • 我发现在单独的线程中运行循环时,这是必需的。 (2认同)

小智 17

我遇到了类似的问题,我希望我的 asyncio 模块可以从非 asyncio 脚本(在 gevent 下运行......不要问......)调用。下面的代码解决了我的问题,因为它尝试获取当前事件循环,但如果当前线程中没有事件循环,则会创建一个事件循环。在 python 3.9.11 中测试。

try:
    loop = asyncio.get_event_loop()
except RuntimeError as e:
    if str(e).startswith('There is no current event loop in thread'):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    else:
        raise
Run Code Online (Sandbox Code Playgroud)

  • @alelom 这个答案只受到你的批评,而且没有任何理由。如果您想要获取对默认事件循环的引用,但您不知道当前是否存在正在运行的循环,则此答案也是有用的。在这两种情况下,您似乎都不理解问题或代码的目的。 (5认同)

Asa*_*ssi 7

使用asyncio.run()而不是直接使用事件循环。它创建一个新循环并在完成时关闭它。

这是“运行”的样子:

if events._get_running_loop() is not None:
    raise RuntimeError(
        "asyncio.run() cannot be called from a running event loop")

if not coroutines.iscoroutine(main):
    raise ValueError("a coroutine was expected, got {!r}".format(main))

loop = events.new_event_loop()
try:
    events.set_event_loop(loop)
    loop.set_debug(debug)
    return loop.run_until_complete(main)
finally:
    try:
        _cancel_all_tasks(loop)
        loop.run_until_complete(loop.shutdown_asyncgens())
    finally:
        events.set_event_loop(None)
        loop.close()
Run Code Online (Sandbox Code Playgroud)


Ale*_*olm 5

只需直接传递fetch_all即可scheduler.add_job()。异步调度程序支持协程功能作为作业目标。

如果目标可调用对象不是协程函数,则将在工作线程中运行(由于历史原因),因此是例外。

  • 第二个事件循环只会增加更多的复杂性,但没有任何好处。 (2认同)

Jat*_*mir 5

没有提到的重要事情是为什么会发生错误。对我个人而言,了解为什么会发生错误与解决实际问题同样重要。

让我们看一下of的get_event_loop实现BaseDefaultEventLoopPolicy

class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
    ...

    def get_event_loop(self):
        """Get the event loop.

        This may be None or an instance of EventLoop.
        """
        if (self._local._loop is None and
            not self._local._set_called and
            isinstance(threading.current_thread(), threading._MainThread)):
            self.set_event_loop(self.new_event_loop())
        if self._local._loop is None:
            raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)
        return self._local._loop
Run Code Online (Sandbox Code Playgroud)

您可以看到self.set_event_loop(self.new_event_loop())仅在满足以下所有条件时才执行:

  • self._local._loop is None- _local._loop未设置
  • not self._local._set_called- set_event_loop还没被叫
  • isinstance(threading.current_thread(), threading._MainThread) -当前线程是主要线程(在您的情况下,这不是True)

因此引发异常,因为在当前线程中未设置任何循环:

if self._local._loop is None:
    raise RuntimeError('There is no current event loop in thread %r.'
                       % threading.current_thread().name)
Run Code Online (Sandbox Code Playgroud)

  • 因为在 Linux 上,SIGCHLD 只传递给主线程,所以终止的子进程只能通过在主线程中运行的事件循环来确认。 (2认同)