如何在 Python 中编写自己的异步/等待协程函数?

Ben*_*ari 8 python python-3.x async-await python-asyncio

我正在尝试编写我自己的 awaiatbale 函数,该函数可以在异步循环中使用,例如asyncio.sleep()方法或类似这些预等待实现的方法。

这是我到目前为止所做的:

import asyncio

def coro1():
    for i in range(1, 10):
        yield i

def coro2():
    for i in range(1, 10):
        yield i*10

class Coro:  # Not used.
    def __await__(self):
        for i in range(1, 10):
            yield i * 100

@asyncio.coroutine
def wrapper1():
    return (yield from coro1())

@asyncio.coroutine
def wrapper2():
    return (yield from coro2())

for i in wrapper1():
    print(i)

print("Above result was obvious which I can iterate around a couroutine.".center(80, "#"))

async def async_wrapper():
    await wrapper1()
    await wrapper2()

loop = asyncio.get_event_loop()
futures = [asyncio.ensure_future(async_wrapper())]
result = loop.run_until_complete(asyncio.gather(*futures))
print(result)

loop.close()
Run Code Online (Sandbox Code Playgroud)

我得到的结果是:

1
2
3
4
5
6
7
8
9
#######Above result was obvious which I can iterate around a couroutine.#########
Traceback (most recent call last):
  File "stack-coroutine.py", line 36, in <module>
    result = loop.run_until_complete(asyncio.gather(*futures))
  File "/usr/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "stack-coroutine.py", line 30, in async_wrapper
    await wrapper1()
  File "stack-coroutine.py", line 18, in wrapper1
    return (yield from coro1())
  File "stack-coroutine.py", line 5, in coro1
    yield i
RuntimeError: Task got bad yield: 1
Run Code Online (Sandbox Code Playgroud)

我期望的结果是:

1
10
2
20
3
30
.
.
.
Run Code Online (Sandbox Code Playgroud)

[注意]:

  • 我不是在寻找多线程或多进程方法。
  • 这个问题几乎与我尚未解决的问题相似。
  • 我正在使用Python3.6

Ben*_*ari 3

我发现了一种使用生成器的并发/异步方法。然而,这不是一种asyncio方法:

from collections import deque

def coro1():
    for i in range(1, 5):
        yield i

def coro2():
    for i in range(1, 5):
        yield i*10

print('Async behaviour using default list with O(n)'.center(60, '#'))
tasks = list()
tasks.extend([coro1(), coro2()])

while tasks:
    task = tasks.pop(0)
    try:
        print(next(task))
        tasks.append(task)
    except StopIteration:
        pass

print('Async behaviour using deque with O(1)'.center(60, '#'))
tasks = deque()
tasks.extend([coro1(), coro2()])

while tasks:
    task = tasks.popleft()  # select and remove a task (coro1/coro2).
    try:
        print(next(task))
        tasks.append(task)  # add the removed task (coro1/coro2) for permutation.
    except StopIteration:
        pass
Run Code Online (Sandbox Code Playgroud)

出去:

########Async behaviour using default list with O(n)########
1
10
2
20
3
30
4
40
###########Async behaviour using deque with O(1)############
1
10
2
20
3
30
4
40
Run Code Online (Sandbox Code Playgroud)

[更新]:

最后,我通过asyncio语法解决了这个问题:

########Async behaviour using default list with O(n)########
1
10
2
20
3
30
4
40
###########Async behaviour using deque with O(1)############
1
10
2
20
3
30
4
40
Run Code Online (Sandbox Code Playgroud)

出去:

1
10
2
20
3
30
4
40
5
50
Run Code Online (Sandbox Code Playgroud)

另一种并发协程方法是通过async-await表达式和基于堆队列算法的事件循环管理器,不使用asyncio库及其事件循环,也不使用asyncio.sleep()方法:

import asyncio

async def coro1():
    for i in range(1, 6):
        print(i)
        await asyncio.sleep(0)  # switches task every one iteration.

async def coro2():
    for i in range(1, 6):
        print(i * 10)
        await asyncio.sleep(0)  # switches task every one iteration.

async def main():
    futures = [
        asyncio.create_task(coro1()),
        asyncio.create_task(coro2())
    ]
    await asyncio.gather(*futures)

asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

出去:

1
10
2
20
3
30
4
40
5
50
Run Code Online (Sandbox Code Playgroud)