在python多处理池中获取worker的唯一ID

Jos*_*del 41 python multiprocessing

有没有办法为python多处理池中的每个worker分配一个唯一的ID,使得池中特定worker正在运行的作业可以知道哪个worker正在运行它?根据文档,a Process有一个name但是

该名称是一个字符串,仅用于识别目的.它没有语义.可以为多个进程指定相同的名称.

对于我的特定用例,我想在一组四个GPU上运行一堆作业,并且需要设置作业应该运行的GPU的设备编号.因为作业的长度不均匀,我想确保在上一个作业完成之前尝试在其上运行的作业的GPU上没有发生冲突(因此这不能预先为ID分配ID提前工作单位).

sen*_*rle 69

看起来你想要的很简单:multiprocessing.current_process().例如:

import multiprocessing

def f(x):
    print multiprocessing.current_process()
    return x * x

p = multiprocessing.Pool()
print p.map(f, range(6))
Run Code Online (Sandbox Code Playgroud)

输出:

$ python foo.py 
<Process(PoolWorker-1, started daemon)>
<Process(PoolWorker-2, started daemon)>
<Process(PoolWorker-3, started daemon)>
<Process(PoolWorker-1, started daemon)>
<Process(PoolWorker-2, started daemon)>
<Process(PoolWorker-4, started daemon)>
[0, 1, 4, 9, 16, 25]
Run Code Online (Sandbox Code Playgroud)

这将返回进程对象本身,因此进程可以是其自己的标识.你也可以id在它上面调用一个唯一的数字id - 在cpython中,这是进程对象的内存地址,所以我认为没有任何重叠的可能性.最后,您可以使用流程identpid属性或属性 - 但只有在流程启动后才会设置.

此外,查看源代码,在我看来,很可能自动生成的名称(如Process上面repr字符串中的第一个值所示)是唯一的.为每个进程multiprocessing维护一个itertools.counter对象,用于为_identity它产生的任何子进程生成一个元组.因此,顶级进程使用单值id生成子进程,并且它们使用双值id生成进程,依此类推.然后,如果没有名称传递给Process构造函数,它只是使用自动生成基于_identity 的名称':'.join(...).然后使用Pool 更改进程的名称replace,使自动生成的ID保持不变.

所有这些的结果是尽管两个Processes 可能具有相同的名称,因为您可以在创建它们时为它们指定相同的名称,如果您不触摸name参数,它们是唯一的.此外,理论上您可以使用_identity唯一标识符; 但是我认为他们因为一个原因让这个变量私有!

以上实例的一个例子:

import multiprocessing

def f(x):
    created = multiprocessing.Process()
    current = multiprocessing.current_process()
    print 'running:', current.name, current._identity
    print 'created:', created.name, created._identity
    return x * x

p = multiprocessing.Pool()
print p.map(f, range(6))
Run Code Online (Sandbox Code Playgroud)

输出:

$ python foo.py 
running: PoolWorker-1 (1,)
created: Process-1:1 (1, 1)
running: PoolWorker-2 (2,)
created: Process-2:1 (2, 1)
running: PoolWorker-3 (3,)
created: Process-3:1 (3, 1)
running: PoolWorker-1 (1,)
created: Process-1:2 (1, 2)
running: PoolWorker-2 (2,)
created: Process-2:2 (2, 2)
running: PoolWorker-4 (4,)
created: Process-4:1 (4, 1)
[0, 1, 4, 9, 16, 25]
Run Code Online (Sandbox Code Playgroud)


Ste*_*han 6

您可以使用multiprocessing.Queue来存储 id,然后在池进程初始化时获取 id。

好处:

  • 您不需要依赖内部结构。
  • 如果您的用例是管理资源/设备,那么您可以直接输入设备编号。这也将确保没有设备被使用两次:如果您的池中的进程多于设备,则额外的进程将被阻止queue.get()并且不会执行任何工作(这不会阻止您的 porgram,或者至少它没有我测试过)。

缺点:

  • 您有额外的通信开销,并且生成池进程需要更长的时间:如果没有sleep(1)示例中的所有工作,则第一个进程可能会执行所有工作,因为其他进程尚未完成初始化。
  • 你需要一个全局的(或者至少我不知道解决方法)

例子:

import multiprocessing
from time import sleep

def init(queue):
    global idx
    idx = queue.get()

def f(x):
    global idx
    process = multiprocessing.current_process()
    sleep(1)
    return (idx, process.pid, x * x)

ids = [0, 1, 2, 3]
manager = multiprocessing.Manager()
idQueue = manager.Queue()

for i in ids:
    idQueue.put(i)

p = multiprocessing.Pool(8, init, (idQueue,))
print(p.map(f, range(8)))
Run Code Online (Sandbox Code Playgroud)

输出:

[(0, 8289, 0), (1, 8290, 1), (2, 8294, 4), (3, 8291, 9), (0, 8289, 16), (1, 8290, 25), (2, 8294, 36), (3, 8291, 49)]
Run Code Online (Sandbox Code Playgroud)

请注意,尽管池包含 8 个进程并且一个 idx 仅由一个进程使用,但只有 4 个不同的 pid。