如何在Python多处理中动态创建每进程队列

jmm*_*mcd 8 python multiprocessing

我想动态创建多个Processes,其中每个实例都有一个队列,用于传入来自其他实例的消息,每个实例也可以创建新实例.因此,我们最终得到了一个互相发送的流程网络.允许每个实例发送给所有其他实例.

下面的代码可以实现我想要的:它使用a Manager.dict()来存储队列,确保传播更新,以及a Lock()保护对队列的写访问.但是,当添加新队列时,它会抛出"RuntimeError: Queue objects should only be shared between processes through inheritance".

问题是,在启动时,我们不知道最终需要多少队列,因此我们必须动态创建它们.但由于除了施工时我们不能共享队列,我不知道该怎么做.

我知道一种可能性是创建queues一个全局变量而不是传入的托管变量__init__:然后,根据我的理解,问题是queues变量的添加不会传播到其他进程.

编辑我正在研究进化算法.EA是一种机器学习技术.EA模拟"人口",其通过适者生存,交叉和突变而发展.在并行 EA中,如此处,我们还在群体之间进行迁移,对应于进程间通信.群岛也可以产生新岛屿,因此我们需要一种在动态创建的流程之间发送消息的方法.

import random, time
from multiprocessing import Process, Queue, Lock, Manager, current_process
try:
    from queue import Empty as EmptyQueueException
except ImportError:
    from Queue import Empty as EmptyQueueException

class MyProcess(Process):
    def __init__(self, queues, lock):
        super(MyProcess, self).__init__(target=lambda x: self.run(x),
                                     args=tuple())
        self.queues = queues
        self.lock = lock
        # acquire lock and add a new queue for this process
        with self.lock:
            self.id = len(list(self.queues.keys()))
            self.queues[self.id] = Queue()

    def run(self):
        while len(list(self.queues.keys())) < 10:

            # make a new process
            new = MyProcess(self.lock)
            new.start()

            # send a message to a random process
            dest_key = random.choice(list(self.queues.keys()))
            dest = self.queues[dest_key]
            dest.put("hello to %s from %s" % (dest_key, self.id))

            # receive messages
            message = True
            while message:
                try:
                    message = self.queues[self.id].get(False) # don't block
                    print("%s received: %s" % (self.id, message))
                except EmptyQueueException:
                    break

            # what queues does this process know about?
            print("%d: I know of %s" %
                  (self.id, " ".join([str(id) for id in self.queues.keys()])))

            time.sleep(1)

if __name__ == "__main__":
    # Construct MyProcess with a Manager.dict for storing the queues
    # and a lock to protect write access. Start.
    MyProcess(Manager().dict(), Lock()).start()
Run Code Online (Sandbox Code Playgroud)

std*_*err 3

我不完全确定您的用例实际上是什么。也许如果您详细说明为什么要让每个进程动态生成一个具有连接队列的子进程,那么在这种情况下正确的解决方案是什么会更清楚。

无论如何,就目前的问题而言,目前似乎没有一个真正好的方法可以使用多处理动态创建管道或队列。

我认为,如果您愿意在每个进程中生成线程,您也许可以用来multiprocessing.connection.Listener/Client来回通信。我没有生成线程,而是采用了使用网络套接字并选择在线程之间进行通信的方法。

动态进程生成和网络套接字可能仍然不稳定,具体取决于multiprocessing生成/分叉新进程时如何清理文件描述符,并且您的解决方案很可能在 *nix 衍生品上更容易工作。如果您担心套接字开销,您可以使用 unix 域套接字来变得更轻量,但代价是增加在多个工作计算机上运行节点的复杂性。

不管怎样,这里有一个使用网络套接字和全局进程列表来完成此任务的示例,因为我无法找到一个好的方法来完成multiprocessing它。

import collections
import multiprocessing
import random
import select
import socket
import time


class MessagePassingProcess(multiprocessing.Process):
    def __init__(self, id_, processes):
        self.id = id_
        self.processes = processes
        self.queue = collections.deque()
        super(MessagePassingProcess, self).__init__()

    def run(self):
        print "Running"
        inputs = []
        outputs = []
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        address = self.processes[self.id]["address"]
        print "Process %s binding to %s"%(self.id, address)
        server.bind(address)
        server.listen(5)
        inputs.append(server)
        process = self.processes[self.id]
        process["listening"] = True
        self.processes[self.id] = process
        print "Process %s now listening!(%s)"%(self.id, process)
        while inputs:
            readable, writable, exceptional = select.select(inputs,
                                                           outputs,
                                                           inputs,
                                                           0.1)
            for sock in readable:
                print "Process %s has a readable scoket: %s"%(self.id,
                                                              sock)
                if sock is server:
                    print "Process %s has a readable server scoket: %s"%(self.id,
                                                              sock)
                    conn, addr = sock.accept()
                    conn.setblocking(0)
                    inputs.append(conn)
                else:
                    data = sock.recv(1024)
                    if data:
                        self.queue.append(data)
                        print "non server readable socket with data"
                    else:
                        inputs.remove(sock)
                        sock.close()
                        print "non server readable socket with no data"

            for sock in exceptional:
                print "exception occured on socket %s"%(sock)
                inputs.remove(sock)
                sock.close()

            while len(self.queue) >= 1:
                print "Received:", self.queue.pop()

            # send a message to a random process:
            random_id = random.choice(list(self.processes.keys()))
            print "%s Attempting to send message to %s"%(self.id, random_id)
            random_process = self.processes[random_id]
            print "random_process:", random_process
            if random_process["listening"]:
                random_address = random_process["address"]
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                try:
                    s.connect(random_address)
                except socket.error:
                    print "%s failed to send to %s"%(self.id, random_id)
                else:
                    s.send("Hello World!")                    
                finally:
                    s.close()

            time.sleep(1)

if __name__=="__main__":
    print "hostname:", socket.getfqdn()
    print dir(multiprocessing)
    manager = multiprocessing.Manager()
    processes = manager.dict()
    joinable = []
    for n in xrange(multiprocessing.cpu_count()):
        mpp = MessagePassingProcess(n, processes)
        processes[n] = {"id":n,
                        "address":("127.0.0.1",7000+n),
                        "listening":False,
                        }
        print "processes[%s] = %s"%(n, processes[n])
        mpp.start()
        joinable.append(mpp)
    for process in joinable:
        process.join()
Run Code Online (Sandbox Code Playgroud)

经过大量的打磨和测试,这可能是multiprocessing.Process和/或的逻辑扩展multiprocessing.Pool,因为如果它在标准库中可用,这看起来确实是人们会使用的东西。创建一个使用可供其他队列发现的套接字的 DynamicQueue 类也可能是合理的。

无论如何,希望它有所帮助。如果您找到更好的方法来完成这项工作,请更新。