非阻塞multiprocessing.connection.Listener?

iga*_*ann 12 python sockets asynchronous python-3.3 python-multiprocessing

我使用multiprocessing.connection.Listener进行进程之间的通信,它对我来说是一个魅力.现在我真的很喜欢我的mainloop在客户端的命令之间做一些其他事情.不幸的是,listener.accept()阻止执行,直到建立客户端进程的连接.

是否有一种简单的方法来管理multiprocessing.connection的非阻塞检查?超时?或者我应该使用专用线程?

    # Simplified code:

    from multiprocessing.connection import Listener

    def mainloop():
        listener = Listener(address=(localhost, 6000), authkey=b'secret')

        while True:
            conn = listener.accept() # <---  This blocks!
            msg = conn.recv() 
            print ('got message: %r' % msg)
            conn.close()
Run Code Online (Sandbox Code Playgroud)

小智 1

我自己没有使用 Listener 对象——对于这个任务,我通常使用multiprocessing.Queue; doco,链接如下:

https://docs.python.org/2/library/queue.html#Queue.Queue

该对象可用于通过良好的 API 在 Python 进程之间发送和接收任何可 pickle 的对象;我想您最感兴趣的是:

  • 在过程A中
    • .put('some message')
  • 过程B中
    • .get_nowait() # will raise Queue.Empty if nothing is available- handle that to move on with your execution

唯一的限制是您需要在某个时刻控制两个 Process 对象,以便能够为它们分配队列 - 如下所示:

import time
from Queue import Empty
from multiprocessing import Queue, Process


def receiver(q):
    while 1:
        try:
            message = q.get_nowait()
            print 'receiver got', message
        except Empty:
            print 'nothing to receive, sleeping'
            time.sleep(1)


def sender(q):
    while 1:
        message = 'some message'
        q.put('some message')
        print 'sender sent', message
        time.sleep(1)


some_queue = Queue()

process_a = Process(
    target=receiver,
    args=(some_queue,)
)

process_b = Process(
    target=sender,
    args=(some_queue,)
)

process_a.start()
process_b.start()

print 'ctrl + c to exit'
try:
    while 1:
        time.sleep(1)
except KeyboardInterrupt:
    pass

process_a.terminate()
process_b.terminate()

process_a.join()
process_b.join()
Run Code Online (Sandbox Code Playgroud)

队列很好,因为您实际上可以根据需要拥有任意数量的消费者和生产者(对于分配任务很方便)。

我应该指出,仅仅调用.terminate()进程是不好的形式 - 您应该使用闪亮的新消息传递系统来传递关闭消息或类似性质的消息。