Python多处理和共享计数器

Dav*_*ide 57 python multiprocessing

我遇到了多处理模块的麻烦.我正在使用一个带有map方法的工作池来从大量文件加载数据,并且每个文件都使用自定义函数分析数据.每次处理文件时,我都希望更新一个计数器,以便我可以跟踪要处理的文件数量.这是示例代码:

def analyze_data( args ):
    # do something 
    counter += 1
    print counter


if __name__ == '__main__':

    list_of_files = os.listdir(some_directory)

    global counter
    counter = 0

    p = Pool()
    p.map(analyze_data, list_of_files)
Run Code Online (Sandbox Code Playgroud)

我无法找到解决方案.

jkp*_*jkp 62

问题是counter变量不在您的进程之间共享:每个单独的进程都在创建它自己的本地实例并递增它.

有关可用于在流程之间共享状态的一些技术,请参阅文档的此部分.在您的情况下,您可能希望Value在工作人员之间共享实例

这是您的示例的工作版本(带有一些虚拟输入数据).请注意,它使用全局值,我在实践中会尽量避免:

from multiprocessing import Pool, Value
from time import sleep

counter = None

def init(args):
    ''' store the counter for later use '''
    global counter
    counter = args

def analyze_data(args):
    ''' increment the global counter, do something with the input '''
    global counter
    # += operation is not atomic, so we need to get a lock:
    with counter.get_lock():
        counter.value += 1
    print counter.value
    return args * 10

if __name__ == '__main__':
    #inputs = os.listdir(some_directory)

    #
    # initialize a cross-process counter and the input lists
    #
    counter = Value('i', 0)
    inputs = [1, 2, 3, 4]

    #
    # create the pool of workers, ensuring each one receives the counter 
    # as it starts. 
    #
    p = Pool(initializer = init, initargs = (counter, ))
    i = p.map_async(analyze_data, inputs, chunksize = 1)
    i.wait()
    print i.get()
Run Code Online (Sandbox Code Playgroud)

  • 不幸的是,这个例子似乎是有缺陷的,因为`counter.value + = 1`在进程之间不是原子的,所以如果运行足够长的几个进程,那么该值将是错误的 (21认同)
  • @jkp,没有全局变量你会怎么做? - 我正在尝试使用一个类,但它并不像看起来那么容易.请参阅http://stackoverflow.com/questions/1816958/cant-pickle-type-in​​stancemethod-when-using-pythons-multiprocessing-pool-ma (3认同)
  • 注意它应该是`with counter.get_lock()`,而不是`with counter.value.get_lock():` (3认同)
  • 与Eli所说的一致,`Lock`必须包含`counter value + = 1`语句.请参阅http://stackoverflow.com/questions/1233222/python-multiprocessing-easy-way-to-implement-a-simple-counter (2认同)

ser*_*aut 35

没有竞争条件错误的计数器类:

class Counter(object):
    def __init__(self):
        self.val = multiprocessing.Value('i', 0)

    def increment(self, n=1):
        with self.val.get_lock():
            self.val.value += n

    @property
    def value(self):
        return self.val.value
Run Code Online (Sandbox Code Playgroud)


lbs*_*eek 11

一个非常简单的例子,从 jkp 的答案中改变:

from multiprocessing import Pool, Value
from time import sleep

counter = Value('i', 0)
def f(x):
    global counter
    with counter.get_lock():
        counter.value += 1
    print("counter.value:", counter.value)
    sleep(1)
    return x

with Pool(4) as p:
    r = p.map(f, range(1000*1000))
Run Code Online (Sandbox Code Playgroud)

  • 我测试了你的代码。计数器不共享。每个进程都有自己的计数器。 (2认同)

Bar*_*art 5

更快的 Counter 类,无需两次使用 Value 的内置锁

class Counter(object):
    def __init__(self, initval=0):
        self.val = multiprocessing.RawValue('i', initval)
        self.lock = multiprocessing.Lock()

    def increment(self):
        with self.lock:
            self.val.value += 1

    @property
    def value(self):
        return self.val.value
Run Code Online (Sandbox Code Playgroud)

https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.Value https: //docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.RawValue