如何连接不断生成和使用数据的asyncio.coroutines?

det*_*tly 5 python python-3.4 python-asyncio

我正在尝试学习如何(惯用)使用Python 3.4 asyncio.我最大的绊脚石是如何"链接"连续消耗数据的协同程序,用它更新状态,并允许该状态被另一个协同程序使用.

我期望从这个示例程序中观察到的行为只是定期报告从子进程接收的数字总和.报告应该以与Source从子进程接收数字的速率大致相同的速率发生.报告功能中的IO阻塞不应阻止从子进程读取.如果报告功能阻塞的时间长于从子进程读取的迭代次数,我不关心它是否会向前跳过或立即报告一堆; 但是应该reporter()expect_exact()足够长的时间范围内的迭代次数.

#!/usr/bin/python3
import asyncio
import pexpect

class Source:

    def __init__(self):
        self.flag = asyncio.Event()
        self.sum = 0

    def start(self):
        self.flag.set()

    def stop(self):
        self.flag.clear()

    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()

        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")

        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            self.sum += int(p.before)

        p.terminate()

@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source # ???
        print("New sum is: {:d}".format(new_sum))
        # Potentially some other blocking operation
        yield from limited_throughput.write(new_sum)

def main():
    loop = asyncio.get_event_loop()

    source = Source()
    loop.call_later(1, source.start)
    loop.call_later(11, source.stop)

    # Again, not sure what goes here...
    asyncio.async(reporter(source))

    loop.run_until_complete(source.run())
    loop.close()

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

这个例子需要pexpect从git安装; 您可以轻松地替换run()为:

@asyncio.coroutine
def run(self):
    yield from self.flag.wait()

    while self.flag.is_set():
        value = yield from asyncio.sleep(0.5, random.choice((-1, 1)))
        self.sum += value
Run Code Online (Sandbox Code Playgroud)

但是我感兴趣的真正的子流程需要在a中运行pty,我认为这意味着提供的子流程传输/协议框架asyncio将不足以满足这一要求.关键是异步活动的源是可以使用的协同程序yield from.

请注意,reporter()此示例中的函数不是有效代码; 我的问题是我不知道那里应该去做什么.理想情况下,我想将reporter()代码分开run(); 这个问题的关键在于看看如何使用中的组件将更复杂的程序分解为更小的代码单元asyncio.

有没有办法用asyncio模块构建这种行为?

det*_*tly 5

锁定原语和队列asyncio本身提供了一些执行此操作的机制.

条件

asyncio.Condition()规定将被通知的状态的一种方式.如果丢弃某些事件并不重要,请使用此选项.

class Source:

    def __init__(self):
        self.flag = asyncio.Event()
        self.sum = 0

        # For consumers
        self.ready = asyncio.Condition()

    def start(self):
        self.flag.set()

    def stop(self):
        self.flag.clear()

    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()

        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")

        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            self.sum += int(p.before)
            with (yield from self.ready):
                self.ready.notify_all() # Or just notify() depending on situation

        p.terminate()

    @asyncio.coroutine
    def read(self):
        with (yield from self.ready):
            yield from self.ready.wait()
            return self.sum


@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source.read()
        print("New sum is: {:d}".format(new_sum))
        # Other potentially blocking stuff in here
Run Code Online (Sandbox Code Playgroud)

队列

asyncio.Queue()让你把你的数据在队列中(无论是LIFO或FIFO),并有别的东西从它读.如果您绝对想要回复每个事件,即使您的消费者落后(及时),请使用此选项.请注意,如果限制队列的大小,如果您的消费者足够慢,您的生产者最终会阻止.

请注意,这允许我们转换sum为局部变量.

#!/usr/bin/python3
import asyncio
import pexpect

class Source:

    def __init__(self):
        self.flag = asyncio.Event()
        # NOTE: self.sum removed!

        # For consumers
        self.output = asyncio.Queue()

    def start(self):
        self.flag.set()

    def stop(self):
        self.flag.clear()

    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()

        sum = 0

        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")

        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            sum += int(p.before)
            yield from self.output.put(sum)

        p.terminate()

    @asyncio.coroutine
    def read(self):
        return (yield from self.output.get())

@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source.read()
        print("New sum is: {:d}".format(new_sum))
        # Other potentially blocking stuff here
Run Code Online (Sandbox Code Playgroud)

请注意,Python 3.4.4添加task_done()join()方法Queue,以便您在知道消费者完成后(适用时)优雅地完成所有内容的处理.