python池与工人进程

Fel*_*lix 28 python multiprocessing

我试图使用Process对象在python中使用worker Pool.每个工作者(一个进程)进行一些初始化(花费非常重要的时间),传递一系列作业(理想情况下使用map()),并返回一些东西.除此之外不需要任何沟通.但是,我似乎无法弄清楚如何使用map()来使用我的worker的compute()功能.

from multiprocessing import Pool, Process

class Worker(Process):
    def __init__(self):
        print 'Worker started'
        # do some initialization here
        super(Worker, self).__init__()

    def compute(self, data):
        print 'Computing things!'
        return data * data

if __name__ == '__main__':
    # This works fine
    worker = Worker()
    print worker.compute(3)

    # workers get initialized fine
    pool = Pool(processes = 4,
                initializer = Worker)
    data = range(10)
    # How to use my worker pool?
    result = pool.map(compute, data)
Run Code Online (Sandbox Code Playgroud)

是一个工作队列的方式,或者我可以使用map()

S.L*_*ott 52

我建议您使用队列.

class Worker(Process):
    def __init__(self, queue):
        super(Worker, self).__init__()
        self.queue = queue

    def run(self):
        print('Worker started')
        # do some initialization here

        print('Computing things!')
        for data in iter(self.queue.get, None):
            # Use data
Run Code Online (Sandbox Code Playgroud)

现在你可以开始一堆这些,所有这些都是从一个队列中获得的

request_queue = Queue()
for i in range(4):
    Worker(request_queue).start()
for data in the_real_source:
    request_queue.put(data)
# Sentinel objects to allow clean shutdown: 1 per worker.
for i in range(4):
    request_queue.put(None) 
Run Code Online (Sandbox Code Playgroud)

这种事情应该允许您分摊多个工人的昂贵的启动成本.


jfs*_*jfs 6

initializer期望一个任意的可调用来执行启动,例如,它可以设置一些全局变量,而不是Process子类; map接受任意迭代:

#!/usr/bin/env python
import multiprocessing as mp

def init(val):
    print('do some initialization here')

def compute(data):
    print('Computing things!')
    return data * data

def produce_data():
    yield -100
    for i in range(10):
        yield i
    yield 100

if __name__=="__main__":
  p = mp.Pool(initializer=init, initargs=('arg',))
  print(p.map(compute, produce_data()))
Run Code Online (Sandbox Code Playgroud)