Gil*_*tes 9 python generator python-asyncio
考虑以下代码:
#!/usr/bin/env python
# coding=utf-8
from string import letters
def filter_upper(letters):
for letter in letters:
if letter.isupper():
yield letter
def filter_selected(letters, selected):
selected = set(map(str.lower, selected))
for letter in letters:
if letter.lower() in selected:
yield letter
def main():
stuff = filter_selected(filter_upper(letters), ['a', 'b', 'c'])
print(list(stuff))
main()
Run Code Online (Sandbox Code Playgroud)
这是从发电机构建的管道的图示.我经常在实践中使用这种模式来构建数据处理流程.这就像UNIX管道.
将生成器重构为每个暂停执行的协同程序的最优雅方法是什么yield?
我的第一次尝试是这样的:
#!/usr/bin/env python
# coding=utf-8
import asyncio
@asyncio.coroutine
def coro():
for e in ['a', 'b', 'c']:
future = asyncio.Future()
future.set_result(e)
yield from future
@asyncio.coroutine
def coro2():
a = yield from coro()
print(a)
loop = asyncio.get_event_loop()
loop.run_until_complete(coro2())
Run Code Online (Sandbox Code Playgroud)
但由于某种原因它不起作用 - 变量a变为None.
我最近想出了什么:
服务器:
#!/usr/bin/env python
# coding=utf-8
"""Server that accepts a client and send it strings from user input."""
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = ''
port = 5555
s.bind((host, port))
s.listen(1)
print('Listening...')
conn, addr = s.accept()
print('Client ({}) connected.'.format(addr))
while True:
conn.send(raw_input('Enter data to send: '))
Run Code Online (Sandbox Code Playgroud)
客户:
#!/usr/bin/env python
# coding=utf-8
"""Client that demonstrates processing pipeline."""
import trollius as asyncio
from trollius import From
@asyncio.coroutine
def upper(input, output):
while True:
char = yield From(input.get())
print('Got char: ', char)
yield From(output.put(char.upper()))
@asyncio.coroutine
def glue(input, output):
chunk = []
while True:
e = yield From(input.get())
chunk.append(e)
print('Current chunk: ', chunk)
if len(chunk) == 3:
yield From(output.put(chunk))
chunk = []
@asyncio.coroutine
def tcp_echo_client(loop):
reader, writer = yield From(asyncio.open_connection('127.0.0.1', 5555,
loop=loop))
q1 = asyncio.Queue()
q2 = asyncio.Queue()
q3 = asyncio.Queue()
@asyncio.coroutine
def printer():
while True:
print('Pipeline ouput: ', (yield From(q3.get())))
asyncio.async(upper(q1, q2))
asyncio.async(glue(q2, q3))
asyncio.async(printer())
while True:
data = yield From(reader.read(100))
print('Data: ', data)
for byte in data:
yield From(q1.put(byte))
print('Close the socket')
writer.close()
@asyncio.coroutine
def background_stuff():
while True:
yield From(asyncio.sleep(3))
print('Other background stuff...')
loop = asyncio.get_event_loop()
asyncio.async(background_stuff())
loop.run_until_complete(tcp_echo_client(loop))
loop.close()
Run Code Online (Sandbox Code Playgroud)
优于"David Beazley的协同程序"的是,你可以使用asyncio这些处理单元中的所有东西input和output队列.
这里的缺点 - 连接管道单元需要很多队列实例.它可以使用更先进的数据结构来修复asyncio.Queue.
另一个缺点是这种处理单元不会将它们的异常传播到父堆栈帧,因为它们是"后台任务",而"David Beazley的协同程序"确实传播.
这就是我的想法:https:
//gist.github.com/AndrewPashkin/04c287def6d165fc2832
我认为这里的答案是“你不”。我猜您是从 David Beazley著名的协程/生成器教程中得到这个想法的。在他的教程中,他使用协程基本上作为反向生成器管道。您无需通过迭代生成器来通过管道拉取数据,而是使用gen_object.send(). 使用协程概念,您的第一个示例看起来像这样:
from string import letters
def coroutine(func):
def start(*args,**kwargs):
cr = func(*args,**kwargs)
cr.next()
return cr
return start
@coroutine
def filter_upper(target):
while True:
letter = yield
if letter.isupper():
target.send(letter)
@coroutine
def filter_selected(selected):
selected = set(map(str.lower, selected))
out = []
try:
while True:
letter = yield
if letter.lower() in selected:
out.append(letter)
except GeneratorExit:
print out
def main():
filt = filter_upper(filter_selected(['a', 'b', 'c']))
for letter in letters:
filt.send(letter)
filt.close()
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
现在,中的协程asyncio是相似的,因为它们是可挂起的生成器对象,可以将数据发送到其中,但它们实际上根本不适合数据管道用例。它们旨在用于在执行阻塞 I/O 操作时启用并发性。挂起yield from点允许控制在 I/O 发生时返回到事件循环,并且事件循环将在完成时重新启动协程,将 I/O 调用返回的数据发送到协程中。实际上没有实际理由尝试将它们用于此类用例,因为根本不会发生阻塞 I/O。
另外,您尝试使用的问题asyncio是a = yield from coro()将 a 分配给的返回值coro。但你实际上并没有从 中返回任何东西coro。您陷入了介于coro实际协程和生成器之间的困境。看起来您希望将fromyield from future的内容发送到to ,但这不是协程的工作方式。用于从协程中提取数据/ / ,并用于实际将对象发送回调用者。因此,为了真正返回一些东西到,你需要这样做:futurecorocoro2yield fromFutureTaskreturncorocoro2
@asyncio.coroutine
def coro():
for e in ['a', 'b', 'c']:
future = asyncio.Future()
future.set_result(e)
return future
Run Code Online (Sandbox Code Playgroud)
但这只会随着'a'返回而结束coro2。我认为要获得您期望的输出,您需要这样做:
@asyncio.coroutine
def coro():
future = asyncio.Future()
future.set_result(['a', 'b', 'c'])
return future
Run Code Online (Sandbox Code Playgroud)
这也许说明了为什么asyncio协程不是您想要的。
编辑:
好吧,考虑到除了实际使用异步 I/O 之外您还想使用管道的情况,我认为您在更新中使用的方法很好。正如您所建议的,通过创建数据结构来帮助自动化队列管理可以使其变得更简单:
class Pipeline(object):
def __init__(self, *nodes):
if len(nodes) < 2:
raise Exception("Need at least two nodes in the pipeline")
self.start = asyncio.Queue()
in_ = self.start
for node in nodes:
out = asyncio.Queue()
asyncio.async(node(in_, out))
in_ = out
@asyncio.coroutine
def put(self, val):
yield from self.start.put(val)
# ... (most code is unchanged)
@asyncio.coroutine
def printer(input_, output):
# For simplicity, I have the sink taking an output queue. Its not being used,
# but you could make the final output queue accessible from the Pipeline object
# and then add a get() method to the `Pipeline` itself.
while True:
print('Pipeline ouput: ', (yield from input_.get()))
@asyncio.coroutine
def tcp_echo_client(loop):
reader, writer = yield from asyncio.open_connection('127.0.0.1', 5555,
loop=loop)
pipe = Pipeline(upper, glue, printer)
while True:
data = yield from reader.read(100)
if not data:
break
print('Data: ', data)
for byte in data.decode('utf-8'):
yield from pipe.put(byte) # Add to the pipe
print('Close the socket')
writer.close()
Run Code Online (Sandbox Code Playgroud)
这简化了Queue管理,但没有解决异常处理问题。我不确定是否可以为此做很多事情......