Python多处理问题?

use*_*424 5 python zeromq

我有一个包含500个输入文件的文件夹(所有文件的总大小约为500 [MB]).

我想编写一个python执行以下操作的脚本:

(1)将所有输入文件加载到内存中

(2)初始化一个python稍后将使用的空列表...参见bullet (4)

(3)启动15个不同的(独立的)进程:每个进程使用相同的输入数据[from (1)] - 但使用不同的算法来处理它,从而产生不同的结果

(4)我希望所有独立的过程[从步骤(3)]将它们的输出存储在同一个python列表中[在步骤中初始化的相同列表(2)]

一旦所有15个流程完成运行,我将one python list包含所有15个独立流程的结果.

我的问题是,是否有可能有效地完成上述工作python?如果是这样,你能提供一个方案/示例代码,说明如何这样做吗?

注意#1:我将在强大的多核服务器上运行它; 所以这里的目标是在所有独立进程中共享一些内存{ input data,output list} 时使用所有处理能力.

注意#2:我在一个Linux环境中工作

Tom*_*lis 6

好吧,我只是使用zeromq来向多个发布者演示单个订阅者.您可能可以对队列执行相同的操作,但您需要更多地管理它们.zeromq套接字只是工作,这使得它很适合这样的IMO.

"""
demo of multiple processes doing processing and publishing the results
to a common subscriber
"""
from multiprocessing import Process


class Worker(Process):
    def __init__(self, filename, bind):
        self._filename = filename
        self._bind = bind
        super(Worker, self).__init__()

    def run(self):
        import zmq
        import time
        ctx = zmq.Context()
        result_publisher = ctx.socket(zmq.PUB)
        result_publisher.bind(self._bind)
        time.sleep(1)
        with open(self._filename) as my_input:
            for l in my_input.readlines():
                result_publisher.send(l)

if __name__ == '__main__':
    import sys
    import os
    import zmq

    #assume every argument but the first is a file to be processed
    files = sys.argv[1:]

    # create a worker for each file to be processed if it exists pass
    # in a bind argument instructing the socket to communicate via ipc
    workers = [Worker(f, "ipc://%s_%s" % (f, i)) for i, f \
               in enumerate((x for x in files if os.path.exists(x)))]

    # create subscriber socket
    ctx = zmq.Context()

    result_subscriber = ctx.socket(zmq.SUB)
    result_subscriber.setsockopt(zmq.SUBSCRIBE, "")

    # wire up subscriber to whatever the worker is bound to 
    for w in workers:
        print w._bind
        result_subscriber.connect(w._bind)

    # start workers
    for w in workers:
        print "starting workers..."
        w.start()

    result = []

    # read from the subscriber and add it to the result list as long
    # as at least one worker is alive
    while [w for w in workers if w.is_alive()]:
        result.append(result_subscriber.recv())
    else:
        # output the result
        print result
Run Code Online (Sandbox Code Playgroud)

哦,并获得zmq

$ pip install pyzmq-static
Run Code Online (Sandbox Code Playgroud)

  • @user非常欢迎你.zeromq真的很酷.如果您的问题可以表示为它支持的网络拓扑的混合,那么它使进程间通信变得非常容易.花一个周末玩它,跟随文档,你可能会发现,它开辟了解决涉及多个过程的问题的全部可能性. (2认同)