krd*_*rdx 11 python python-3.x python-asyncio concurrent.futures
我有一个long_task运行大量cpu绑定计算的函数,我希望通过使用新的asyncio框架使其异步.生成的long_task_async函数使用a ProcessPoolExecutor将工作卸载到不同的进程,不受GIL约束.
麻烦的是,由于某种原因,concurrent.futures.Future实例ProcessPoolExecutor.submit从投掷时返回a TypeError.这是设计的吗?那些期货与asyncio.Future班级不兼容吗?什么是解决方法?
我也注意到发电机不是可拣选的,所以提交一个couroutine ProcessPoolExecutor就会失败.这个也有任何清洁的解决方案吗?
import asyncio
from concurrent.futures import ProcessPoolExecutor
@asyncio.coroutine
def long_task():
yield from asyncio.sleep(4)
return "completed"
@asyncio.coroutine
def long_task_async():
with ProcessPoolExecutor(1) as ex:
return (yield from ex.submit(long_task)) #TypeError: 'Future' object is not iterable
# long_task is a generator, can't be pickled
loop = asyncio.get_event_loop()
@asyncio.coroutine
def main():
n = yield from long_task_async()
print( n )
loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)
Eev*_*vee 11
您想要使用loop.run_in_executor,它使用concurrent.futures执行程序,但将返回值映射到asyncio未来.
最初的asyncioPEP 表明, concurrent.futures.Future有朝一日可能会增加一种__iter__方法,因此它也可以使用yield from,但是现在这个库的设计只需要yield from支持而已.(否则一些代码在3.3中实际上不起作用.)
Nih*_*rma 11
我们可以通过调用包装concurrent.futures.Future成一个.我用下面的代码试了一下.工作良好asyncio.futureasyncio.wrap_future(Future)
from asyncio import coroutine
import asyncio
from concurrent import futures
def do_something():
ls = []
for i in range(1, 1000000):
if i % 133333 == 0:
ls.append(i)
return ls
@coroutine
def method():
with futures.ProcessPoolExecutor(max_workers=10) as executor:
job = executor.submit(do_something)
return (yield from asyncio.wrap_future(job))
@coroutine
def call_method():
result = yield from method()
print(result)
def main():
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(call_method())
finally:
loop.close()
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5844 次 |
| 最近记录: |