如何将coroutine添加到正在运行的asyncio循环?

Pet*_*tri 29 python asynchronous python-3.x python-asyncio

如何为正在运行的asyncio循环添加新的协同程序?IE浏览器.一个已经执行了一套协同程序的程序.

我想作为一种解决方法,可以等待现有协程完成,然后初始化一个新循环(使用附加协程).但有更好的方法吗?

Jas*_*ohi 22

您可以使用它create_task来安排新的协同程序:

import asyncio

async def cor1():
    ...

async def cor2():
    ...

async def main(loop):
    await asyncio.sleep(0)
    t1 = loop.create_task(cor1())
    await cor2()
    await t1

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Run Code Online (Sandbox Code Playgroud)

  • 谢谢你的努力,但据我所知,这个答案是错误的.这里第一次调用`main`创建了协程,然后循环开始.换句话说,此示例在循环开始之前调度协同程序.这不是我要求的. (15认同)
  • 循环运行时,你不能调用`loop.run_until_complete()` (3认同)
  • 这怎么可能是答案呢?该任务是在循环开始之前创建的。如何将任务添加到正在运行的循环中意味着启动了事件循环,然后我们想要将任务添加到循环中 (2认同)

Dot*_*otl 15

要向已经运行的事件循环添加函数,您可以使用:

asyncio.ensure_future(my_coro())

在我的情况下,我一直在使用多线程(threading),asyncio并希望将一个任务添加到已经运行的事件循环中.对于处于相同情况的任何其他人,请务必明确说明事件循环(因为在一个内部不存在Thread).即:

在全球范围内:

event_loop = asyncio.get_event_loop()
Run Code Online (Sandbox Code Playgroud)

然后,在你的内心Thread:

asyncio.ensure_future(my_coro(), loop=event_loop)
Run Code Online (Sandbox Code Playgroud)

  • 要将任务添加到在不同线程(例如主线程)中运行的循环,需要使用:`asyncio.run_coroutine_threadsafe(coro,loop)`。请参阅:https://docs.python.org/3/library/asyncio-task.html#asyncio.run_coroutine_threadsafe (4认同)
  • 这在 3.7 之前的 Python 中是正确的。请参阅 https://docs.python.org/3/library/asyncio-task.html#creating-tasks “Python 3.7 中已添加 create_task()。在 Python 3.7 之前,低级 asyncio.ensure_future() 函数可以用它代替”。 (2认同)

Mik*_*mov 8

您的问题非常接近"如何向正在运行的程序添加函数调用?"

什么时候需要为事件循环添加新的协同程序?

我们来看一些例子.这里程序用两个协同程序并行启动事件循环:

import asyncio
from random import randint


async def coro1():
    res = randint(0,3)
    await asyncio.sleep(res)
    print('coro1 finished with output {}'.format(res))
    return res

async def main():
    await asyncio.gather(
        coro1(),
        coro1()
    ) # here we have two coroutines running parallely

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

输出:

coro1 finished with output 1
coro1 finished with output 2
[Finished in 2.2s]
Run Code Online (Sandbox Code Playgroud)

可能你需要添加一些可以获得结果的协同程序,coro1并在它准备好后立即使用它吗?在这种情况下,只需创建等待的coroutine coro1并使用它的返回值:

import asyncio
from random import randint


async def coro1():
    res = randint(0,3)
    await asyncio.sleep(res)
    print('coro1 finished with output {}'.format(res))
    return res

async def coro2():
    res = await coro1()
    res = res * res
    await asyncio.sleep(res)
    print('coro2 finished with output {}'.format(res))
    return res

async def main():
    await asyncio.gather(
        coro2(),
        coro2()
    ) # here we have two coroutines running parallely

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

输出:

coro1 finished with output 1
coro2 finished with output 1
coro1 finished with output 3
coro2 finished with output 9
[Finished in 12.2s]
Run Code Online (Sandbox Code Playgroud)

将协程视为具有特定语法的常规函数​​.你可以启动一些函数并行执行(by asyncio.gather),你可以在第一次完成后启动下一个函数,你可以创建调用其他函数的新函数.

  • 协同程序*同时运行*,而不是*并行运行*.不太一样的事情. (10认同)

Ada*_*dam 8

这里的答案似乎都没有完全回答这个问题。通过让“父”任务为您执行,可以将任务添加到正在运行的事件循环中。我不确定确保父母在孩子都完成之前不会结束的最 Pythonic 方式是什么(假设这是您想要的行为),但这确实有效。

import asyncio
import random


async def add_event(n):
    print('starting ' + str(n))
    await asyncio.sleep(n)
    print('ending ' + str(n))
    return n


async def main(loop):

    added_tasks = []

    delays = [x for x in range(5)]

    # shuffle to simulate unknown run times
    random.shuffle(delays)

    for n in delays:
        print('adding ' + str(n))
        task = loop.create_task(add_event(n))
        added_tasks.append(task)
        await asyncio.sleep(0)

    print('done adding tasks')

    results = await asyncio.gather(*added_tasks)
    print('done running tasks')

    return results


loop = asyncio.get_event_loop()
results = loop.run_until_complete(main(loop))
print(results)
Run Code Online (Sandbox Code Playgroud)


Pav*_*l T 8

如果任务是将协程添加到已经在执行一些协程的循环中,那么您可以使用我的这个解决方案

import asyncio
import time
from threading import Thread

from random import randint


# first, we need a loop running in a parallel Thread
class AsyncLoopThread(Thread):
    def __init__(self):
        super().__init__(daemon=True)
        self.loop = asyncio.new_event_loop()

    def run(self):
        asyncio.set_event_loop(self.loop)
        self.loop.run_forever()


# example coroutine
async def coroutine(num, sec):
    await asyncio.sleep(sec)
    print(f'Coro {num} has finished')


if __name__ == '__main__':
    # init a loop in another Thread
    loop_handler = AsyncLoopThread()
    loop_handler.start()

    # adding first 5 coros
    for i in range(5):
        print(f'Add Coro {i} to the loop')
        asyncio.run_coroutine_threadsafe(coroutine(i, randint(3, 5)), loop_handler.loop)

    time.sleep(3)
    print('Adding 5 more coros')

    # adding 5 more coros
    for i in range(5, 10):
        print(f'Add Coro {i} to the loop')
        asyncio.run_coroutine_threadsafe(coroutine(i, randint(3, 5)), loop_handler.loop)

    # let them all finish
    time.sleep(60)
    
Run Code Online (Sandbox Code Playgroud)

执行此示例后,我们将获得以下输出:

Add Coro 0 to the loop
Add Coro 1 to the loop
Add Coro 2 to the loop
Add Coro 3 to the loop
Add Coro 4 to the loop
Coro 0 has finished
Adding 5 more coros
Add Coro 5 to the loop
Add Coro 6 to the loop
Add Coro 7 to the loop
Add Coro 8 to the loop
Add Coro 9 to the loop
Coro 1 has finished
Coro 3 has finished
Coro 2 has finished
Coro 4 has finished
Coro 9 has finished
Coro 5 has finished
Coro 7 has finished
Coro 6 has finished
Coro 8 has finished

Process finished with exit code 0
Run Code Online (Sandbox Code Playgroud)