ds_*_*ser 3 python multiprocessing python-3.x python-multiprocessing
我正在执行下面的代码,它工作正常,但它不会生成到不同的进程,而是有时所有代码都在同一个进程中运行,有时是两个进程合二为一。我使用的是 4 个 cpu 的机器。这段代码有什么问题吗?
def f(values):
print(multiprocessing.current_process())
return values
def main():
p = Pool(4) #number of processes = number of CPUs
keys, values= zip(*data.items()) #ordered keys and values
processed_values= p.map( f, values )
result= dict( zip(keys, processed_values ) )
p.close() # no more tasks
p.join() # wrap up current tasks
Run Code Online (Sandbox Code Playgroud)
结果是
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
Run Code Online (Sandbox Code Playgroud)
而有时也像这样,
<SpawnProcess(SpawnPoolWorker-3, started daemon)>
<SpawnProcess(SpawnPoolWorker-2, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-3, started daemon)>
Run Code Online (Sandbox Code Playgroud)
有时,
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-4, started daemon)>
<SpawnProcess(SpawnPoolWorker-2, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
Run Code Online (Sandbox Code Playgroud)
我的问题是,它根据什么将职能分配给工人?我编写代码的方式是根据字典中的键数来决定进程数(考虑到我的数据的键数总是少于我的 CPU)。我的代码将像这样开始 - 主代码读取文件并使用单个进程从中创建一个字典,并且应该将其分支到并发进程数并等待它们处理数据(我正在使用 pool.map ),然后一旦获得子进程的结果,它就开始处理它们。我怎样才能实现这个父等待子进程步骤?
你的代码没有任何问题。您的工作项非常快 - 如此之快,以至于同一个工作进程可以运行该函数,返回结果,然后赢得竞赛以消耗用于multiprocessing.Pool分配工作的内部队列中的下一个任务。当您调用时map,工作项会分成几批并放入Queue. pool.map这是将可迭代中的项目分块并将它们放入队列中的部分实现:
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), None))
Run Code Online (Sandbox Code Playgroud)
每个工作进程运行一个具有无限 while 循环的函数,该循环消耗该队列中的项目*:
while maxtasks is None or (maxtasks and completed < maxtasks):
try:
task = get() # Pulls an item off the taskqueue
except (EOFError, IOError):
debug('worker got EOFError or IOError -- exiting')
break
if task is None:
debug('worker got sentinel -- exiting')
break
job, i, func, args, kwds = task
try:
result = (True, func(*args, **kwds)) # Runs the function you passed to map
except Exception, e:
result = (False, e)
try:
put((job, i, result)) # Sends the result back to the parent
except Exception as e:
wrapped = MaybeEncodingError(e, result[1])
debug("Possible encoding error while sending result: %s" % (
wrapped))
Run Code Online (Sandbox Code Playgroud)
同一个工作人员很可能只是偶然能够使用一个项目,运行func,然后使用下一个项目。这有点奇怪- 我无法在运行与您的示例相同的代码的机器上重现它 - 但让同一个工作人员从队列中获取四个项目中的两个是很正常的。
如果您通过插入调用使工作函数花费更长的时间,您应该始终看到均匀分布time.sleep:
def f(values):
print(multiprocessing.current_process())
time.sleep(1)
return values
Run Code Online (Sandbox Code Playgroud)
* 这实际上并不完全正确 - 有一个线程在主进程中运行,它消耗 from taskqueue,然后将其拉出的内容粘贴到另一个Queue,这就是子进程消耗的内容)