Boh*_*nik 5 python generator callback coroutine
假设我们有一些库(例如用于 XML 解析)接受回调并在每次遇到某些事件时调用它(例如找到一些 XML 标记)。我希望能够将这些回调转换为可以通过 for 循环进行迭代的生成器。在不使用线程或收集所有回调结果(即使用延迟评估)的情况下,这在 Python 中是可能的吗?
例子:
# this is how I can produce the items
def callback(item)
# do something with each item
parser.parse(xml_file, callback=callback)
# this is how the items should be consumed
for item in iter_parse(xml_file):
print(item)
Run Code Online (Sandbox Code Playgroud)
我试图研究是否可以使用协程,但似乎协程对于从生产者推送数据很有用,而生成器将数据拉到消费者。
自然的想法是生产者和消费者将成为来回 ping 执行流程的协程。
我已经设法让生产者 - 消费者模式与 asyncio 循环一起工作(以类似于这个答案的方式)。但是它不能像 for 循环中的生成器一样使用:
import asyncio
q = asyncio.Queue(maxsize=1)
@asyncio.coroutine
def produce(data):
for v in data:
print("Producing:", v)
yield from q.put(v)
print("Producer waiting")
yield from q.put(None)
print("Producer done")
@asyncio.coroutine
def consume():
while True:
print("Consumer waiting")
value = yield from q.get()
print("Consumed:", value)
if value is not None:
# process the value
yield from asyncio.sleep(0.5)
else:
break
print("Consumer done")
tasks = [
asyncio.Task(consume()),
asyncio.Task(produce(data=range(5)))
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
Run Code Online (Sandbox Code Playgroud)
问题是结果不能在 for 循环中迭代,因为它是由循环管理的。
当我重写代码以便从普通函数调用回调时,问题是从回调调用的 asyncio.Queue.put() 不会阻塞并且计算不是惰性的。
import asyncio
q = asyncio.Queue(maxsize=1)
def parse(data, callback):
for value in data:
# yield from q.put(value)
callback(value)
@asyncio.coroutine
def produce(data):
@asyncio.coroutine
def enqueue(value):
print('enqueue()', value)
yield from q.put(value)
def callback(value):
print('callback()', value)
asyncio.async(enqueue(value))
parse(data, callback)
print('produce()')
print('produce(): enqueuing sentinel value')
asyncio.async(enqueue(None))
print('produce(): done')
@asyncio.coroutine
def consume():
print('consume()')
while True:
print('consume(): waiting')
value = yield from q.get()
print('consumed:', value)
if value is not None:
# here we'd like to yield and use this in a for loop elsewhere
print(value)
else:
break
print('consume(): done')
tasks = [
asyncio.Task(consume()),
asyncio.Task(produce(range(5)))
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
# I'd like:
# for value in iter_parse(data=range(5)):
# print('consumed:', value)
Run Code Online (Sandbox Code Playgroud)
这种计算甚至可以使用 asyncio 进行,还是我需要使用 greenlet 或 gevent?我似乎在 gevent 中可以在 for 循环中迭代异步结果,但如果可能的话,我不喜欢依赖另一个库,而且它还没有完全为 Python 3 做好准备。
| 归档时间: |
|
| 查看次数: |
709 次 |
| 最近记录: |