Way*_*ner 56 python multithreading asynchronous multiprocessing python-asyncio
几乎每个人都知道他们第一次看到Python中的线程时,GIL会让那些真正希望并行处理的人生活变得悲惨 - 或者至少给它一个机会.
我目前正在研究像Reactor模式这样的实现.实际上我想在一个线程上监听传入的套接字连接,当有人试图连接时,接受该连接并将其传递给另一个线程进行处理.
我(还)不确定我可能会遇到什么样的负担.我知道目前设置的传入消息上限为2MB.从理论上讲,我们每秒可以获得数千(虽然我不知道我们是否已经看到过类似的东西).处理消息所花费的时间并不是非常重要,但显然更快会更好.
我正在研究Reactor模式,并开发了一个使用该multiprocessing库的小例子(至少在测试中)似乎工作得很好.但是,现在/很快我们将有asyncio库可用,它将为我处理事件循环.
是否有任何东西可以通过组合asyncio和咬我multiprocessing?
dan*_*ano 65
尽管你不应该直接使用,但你应该能够安全地结合asyncio并且multiprocessing没有太多麻烦multiprocessing.asyncio(和任何其他基于事件循环的异步框架)的主要罪点是阻止事件循环.如果您尝试multiprocessing直接使用,只要您阻止等待子进程,就会阻止事件循环.显然,这很糟糕.
避免这种情况的最简单方法是使用BaseEventLoop.run_in_executor执行函数concurrent.futures.ProcessPoolExecutor.ProcessPoolExecutor是一个使用实现的进程池multiprocessing.Process,但asyncio内置支持在其中执行函数而不阻塞事件循环.这是一个简单的例子:
import time
import asyncio
from concurrent.futures import ProcessPoolExecutor
def blocking_func(x):
time.sleep(x) # Pretend this is expensive calculations
return x * 5
@asyncio.coroutine
def main():
#pool = multiprocessing.Pool()
#out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
executor = ProcessPoolExecutor()
out = yield from loop.run_in_executor(executor, blocking_func, 10) # This does not
print(out)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)
对于大多数情况,仅此功能就足够了.如果你发现自己需要其他结构的multiprocessing,如Queue,Event,Manager,等,有一个所谓的第三方库aioprocessing(全面披露:我写的),它提供asyncio所有的兼容版本的multiprocessing数据结构.这是一个示例演示:
import time
import asyncio
import aioprocessing
import multiprocessing
def func(queue, event, lock, items):
with lock:
event.set()
for item in items:
time.sleep(3)
queue.put(item+5)
queue.close()
@asyncio.coroutine
def example(queue, event, lock):
l = [1,2,3,4,5]
p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l))
p.start()
while True:
result = yield from queue.coro_get()
if result is None:
break
print("Got result {}".format(result))
yield from p.coro_join()
@asyncio.coroutine
def example2(queue, event, lock):
yield from event.coro_wait()
with (yield from lock):
yield from queue.coro_put(78)
yield from queue.coro_put(None) # Shut down the worker
if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue = aioprocessing.AioQueue()
lock = aioprocessing.AioLock()
event = aioprocessing.AioEvent()
tasks = [
asyncio.async(example(queue, event, lock)),
asyncio.async(example2(queue, event, lock)),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
Run Code Online (Sandbox Code Playgroud)
是的,有很多位可能(或可能不会)咬你.
asyncio它期望在一个线程或进程上运行时.这不(本身)与并行处理一起使用.您不得不分配工作,同时在单个线程/进程中保留IO操作(特别是那些在套接字上的操作).asyncio而不关闭它.下一个障碍是您不能简单地将文件描述符发送到其他进程,除非您使用来自C扩展的特定于平台的(可能是Linux)代码.multiprocessing已知该模块创建了许多用于通信的线程.大多数情况下,当您使用通信结构(例如Queues)时,会生成一个线程.不幸的是,这些线程并非完全不可见.例如,他们可能无法彻底拆除(当您打算终止您的程序时),但根据其数量,资源使用可能会自行显着.如果您真的打算在个别流程中处理个别关联,我建议您研究不同的方法.例如,您可以将套接字置于侦听模式,然后同时接受来自多个工作进程的连接.一旦工作人员完成处理请求,它就可以接受下一个连接,因此您仍然可以使用比为每个连接分配进程更少的资源.例如,Spamassassin和Apache(mpm prefork)可以使用此工作模型.根据您的使用情况,它可能会更容易,更强大.具体来说,您可以在提供配置数量的请求后让您的工作人员死亡,并由主进程重新生成,从而消除内存泄漏的许多负面影响.