bal*_*alu 12 python queue shared-memory multiprocessing python-asyncio
我想使用队列将数据从父进程传递到通过其启动的子进程multiprocessing.Process.但是,由于父进程使用Python的新asyncio库,因此队列方法必须是非阻塞的.据我所知,asyncio.Queue是为了进行任务间通信而不能用于进程间通信.此外,我知道multiprocessing.Queue有put_nowait()和get_nowait()方法,但我实际上需要协程仍然会阻止当前任务(但不是整个过程).有没有办法创建包装put_nowait()/的协同程序get_nowait()?另一方面multiprocessing.Queue,在同一进程中运行的事件循环之后,内部使用的线程是否兼容?
如果没有,我还有其他选择吗?我知道我可以通过使用异步套接字自己实现这样的队列,但我希望我能避免这种情况......
编辑:
我也考虑使用管道而不是套接字,但它似乎asyncio不兼容multiprocessing.Pipe().更确切地说,Pipe()返回一个不是文件Connection对象的对象元组.但是,方法/方法和/都需要类似文件的对象,因此无法异步读取/写入这样的对象.相比之下,包用作管道的通常的文件类对象完全没有问题,并且可以很容易地与之结合使用.asyncio.BaseEventLoopadd_reader()add_writer()connect_read_pipe()connect_write_pipe()Connectionsubprocessasyncio
更新:
我决定进一步探索管道方法:通过检索文件描述符并将其传递给我,将Connection返回的对象转换multiprocessing.Pipe()为类文件对象.最后,我将生成的类文件对象传递给事件循环的/ .(如果有人对确切的代码感兴趣,有一些关于相关问题的邮件列表讨论.)然而,流媒体给了我一个,我没有设法解决这个问题.考虑到缺少对Windows的支持,我不会再继续这样做了.fileno()os.fdopen()connect_read_pipe()connect_write_pipe()read()OSError: [Errno 9] Bad file descriptor
dan*_*ano 13
这是一个multiprocessing.Queue可以使用的对象的实现asyncio.它提供整个multiprocessing.Queue接口,添加coro_get和coro_put方法,asyncio.coroutine可用于异步接收/放入队列.实现细节与我的另一个答案的第二个示例基本相同:ThreadPoolExecutor用于使get/put异步,a multiprocessing.managers.SyncManager.Queue用于在进程之间共享队列.唯一的附加技巧是__getstate__尽管使用不可选择ThreadPoolExecutor的实例变量来实现保持对象可选择性.
from multiprocessing import Manager, cpu_count
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
def AsyncProcessQueue(maxsize=0):
m = Manager()
q = m.Queue(maxsize=maxsize)
return _ProcQueue(q)
class _ProcQueue(object):
def __init__(self, q):
self._queue = q
self._real_executor = None
self._cancelled_join = False
@property
def _executor(self):
if not self._real_executor:
self._real_executor = ThreadPoolExecutor(max_workers=cpu_count())
return self._real_executor
def __getstate__(self):
self_dict = self.__dict__
self_dict['_real_executor'] = None
return self_dict
def __getattr__(self, name):
if name in ['qsize', 'empty', 'full', 'put', 'put_nowait',
'get', 'get_nowait', 'close']:
return getattr(self._queue, name)
else:
raise AttributeError("'%s' object has no attribute '%s'" %
(self.__class__.__name__, name))
@asyncio.coroutine
def coro_put(self, item):
loop = asyncio.get_event_loop()
return (yield from loop.run_in_executor(self._executor, self.put, item))
@asyncio.coroutine
def coro_get(self):
loop = asyncio.get_event_loop()
return (yield from loop.run_in_executor(self._executor, self.get))
def cancel_join_thread(self):
self._cancelled_join = True
self._queue.cancel_join_thread()
def join_thread(self):
self._queue.join_thread()
if self._real_executor and not self._cancelled_join:
self._real_executor.shutdown()
@asyncio.coroutine
def _do_coro_proc_work(q, stuff, stuff2):
ok = stuff + stuff2
print("Passing %s to parent" % ok)
yield from q.coro_put(ok) # Non-blocking
item = q.get() # Can be used with the normal blocking API, too
print("got %s back from parent" % item)
def do_coro_proc_work(q, stuff, stuff2):
loop = asyncio.get_event_loop()
loop.run_until_complete(_do_coro_proc_work(q, stuff, stuff2))
@asyncio.coroutine
def do_work(q):
loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
do_coro_proc_work, q, 1, 2)
item = yield from q.coro_get()
print("Got %s from worker" % item)
item = item + 25
q.put(item)
if __name__ == "__main__":
q = AsyncProcessQueue()
loop = asyncio.get_event_loop()
loop.run_until_complete(do_work(q))
Run Code Online (Sandbox Code Playgroud)
输出:
Passing 3 to parent
Got 3 from worker
got 28 back from parent
Run Code Online (Sandbox Code Playgroud)
如您所见,您可以AsyncProcessQueue从父进程或子进程同步和异步使用它们.它不需要任何全局状态,并且通过将大部分复杂性封装在类中,使用起来比我原来的答案更优雅.
您可能可以直接使用套接字获得更好的性能,但是以跨平台方式工作似乎相当棘手.这也具有可以在多个工作人员中使用的优点,不需要你自己腌制/捣蛋等.
该multiprocessing库并不是特别适合使用asyncio。但是,根据您计划使用multiprocessing/ 的方式multprocessing.Queue,您也许可以将其完全替换为concurrent.futures.ProcessPoolExecutor:
import asyncio
from concurrent.futures import ProcessPoolExecutor
def do_proc_work(stuff, stuff2): # This runs in a separate process
return stuff + stuff2
@asyncio.coroutine
def do_work():
out = yield from loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
do_proc_work, 1, 2)
print(out)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(do_work())
Run Code Online (Sandbox Code Playgroud)
输出:
3
Run Code Online (Sandbox Code Playgroud)
如果你绝对需要一个multiprocessing.Queue,看起来它与以下组合时表现得很好ProcessPoolExecutor:
import asyncio
import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
def do_proc_work(q, stuff, stuff2):
ok = stuff + stuff2
time.sleep(5) # Artificial delay to show that it's running asynchronously
print("putting output in queue")
q.put(ok)
@asyncio.coroutine
def async_get(q):
""" Calls q.get() in a separate Thread.
q.get is an I/O call, so it should release the GIL.
Ideally there would be a real non-blocking I/O-based
Queue.get call that could be used as a coroutine instead
of this, but I don't think one exists.
"""
return (yield from loop.run_in_executor(ThreadPoolExecutor(max_workers=1),
q.get))
@asyncio.coroutine
def do_work(q):
loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
do_proc_work, q, 1, 2)
coro = async_get(q) # You could do yield from here; I'm not just to show that it's asynchronous
print("Getting queue result asynchronously")
print((yield from coro))
if __name__ == "__main__":
m = multiprocessing.Manager()
q = m.Queue() # The queue must be inherited by our worker, it can't be explicitly passed in
loop = asyncio.get_event_loop()
loop.run_until_complete(do_work(q))
Run Code Online (Sandbox Code Playgroud)
输出:
Getting queue result asynchronously
putting output in queue
3
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5172 次 |
| 最近记录: |