3.4.2中的asyncio问题 - 由于某种原因它终止了

dev*_*nID 4 python debugging python-3.x python-asyncio

一个python的新手,花了很多时间阅读文档和其他代码我似乎无法asyncio在Python 3中获得新模块.它一直没有堆栈跟踪终止给我一个线索,应该永远运行但不会.

我试图模仿的基本流程概念如下:

从端口读取: 打开端口 - >读取数据(可变长度) - >放在queue1上

然后处理数据: 从queue1获取数据 - >条件适用 - >结果放在queue2上

然后写入端口: 从queue2获取数据并写入端口

永远地从顶部循环

注意:输入端口上的数据是偶发的,可变长度,因此我可以使用几个块来"序列" asyncio.我理解asyncio将允许一个块到达的情况,然后在我的应用程序响应之前的另一个 - 即该调用get_io_from_port()有助于协同例程的多次执行.这就是我使用队列来确保不阻塞的原因process_queue()

到目前为止我的玩具示例代码:

import queue
import asyncio

@asyncio.coroutine
def process_queue(q1, q2):
    tmp = q1.Get()
    if tmp == 'ABCDEF':
        q2.put('12345')
    elif tmp == 'GHIJKL':
        q2.put =('67890')
    else:
        print('There is a data error')

@asyncio.coroutine
def put_io_to_port(writer, q2):

    if not q2.empty():
        try:
                writer.write(q2.get())

        except IOError as e:
            print('OUT Port issue: ', e)

@asyncio.coroutine
def get_io_from_port(reader, q1):

    try:
        data_i = yield from reader.read(1200)
        q1.put(data_i)

    except IOError as e:
        print('IN Port issue: ', e)

def main():

    q1 = queue()
    q2 = queue()

    loop = asyncio.get_event_loop()     # main loop declaration
    reader, writer = yield from asyncio.open_connection('192.168.1.103', 5555)
                # high-level call open streams - read and write

    print('Start')
    tasks = [
        asyncio.async(get_io_from_port(reader,q1)),
        asyncio.async(process_queue(q1, q2)),
        asyncio.async(put_io_to_port(writer, q2)),]  # do these tasks - in this order

    loop.run_forever(tasks)     # loop through on main loop forever
    loop.close()

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

另外,作为一个旁边 - 如何调试此代码 - 即跟踪?可以提出哪些技巧?我正在使用Eclipse和PyDev,但无济于事.

dan*_*ano 5

你在这里犯了几个错误.首先,你把main它看作是一个普通的函数,但是你已经yield from在那里打了个电话,它会自动将它转换成一个生成器.这意味着当你这样做

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

main实际上没有执行; 调用main()只是创建一个立即丢弃的生成器对象(因为你没有将它赋给变量).这就是为什么你很难调试 - 里面的代码main都没有执行.您应该转换main为协同程序并使用它来调用它loop.run_until_complete.

接下来,您正在尝试使用该queue模块,该模块并非设计用于单线程异步程序.一旦你打电话queue.get(),它就会阻止你的主线程,这意味着你的asyncio事件循环将被阻止,这意味着你的整个程序将陷入僵局.你应该使用coroutine-safe asyncio.Queue代替.

你也有竞争条件put_io_to_port.你只是尝试消耗q2它,如果它不是空的,但它可能put_io_to_port在之前执行process_queue有机会运行和填充queue.如果您完全取消if not q2.empty()支票,看起来你会没事的put_io_to_port.

最后,你将使用你的协程添加到事件循环asyncio.async,这很好.但是你有一个评论说# do these tasks, in this order,但这不是该计划的表现方式asyncio.async.它只是将所有协同程序添加到事件循环中,它们都将并行运行.如果你真的希望它们按顺序运行,你应该这样做:

yield from get_io_from_port(reader,q1)
yield from process_queue(q1, q2)
yield from put_io_to_port(writer, q2)
Run Code Online (Sandbox Code Playgroud)

但这里真的没有必要.您可以同时运行所有这些并获得正确的行为; 如果一个协程在另一个协程之前执行,它将只等到它所依赖的协程传递它所需的数据,然后继续执行.

您也有几个错别字在那里(q1.Get(),q2.put =(...),等).

所以,把所有这些修复放在一起,你得到这个:

import queue
import asyncio

@asyncio.coroutine
def process_queue(q1, q2):
    while True:
        tmp = yield from q1.get()
        if tmp == 'ABCDEF':
            yield from q2.put('12345')
        elif tmp == 'GHIJKL':
            yield from q2.put('67890')
        else:
            print('There is a data error')

@asyncio.coroutine
def put_io_to_port(writer, q2):
    while True:
        try:
            data = yield from q2.get()
            writer.write(data)
        except IOError as e:
            print('OUT Port issue: ', e)

@asyncio.coroutine
def get_io_from_port(reader, q1):
    while True:
        try:
            data_i = yield from reader.read(1200)
            yield from q1.put(data_i)
        except IOError as e:
            print('IN Port issue: ', e)

@asyncio.coroutine
def main():
    q1 = asyncio.Queue()
    q2 = asyncio.Queue()

    reader, writer = yield from asyncio.open_connection('192.168.1.103', 5555)
                # high-level call open streams - read and write

    print('Start')
    tasks = [
        asyncio.async(get_io_from_port(reader,q1)),
        asyncio.async(process_queue(q1, q2)),
        asyncio.async(put_io_to_port(writer, q2)),]


if __name__ == '__main__':
    loop = asyncio.get_event_loop()     # main loop declaration
    loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)