如何在Python多处理中将Pool.map与Array(共享内存)结合起来?

Jer*_*rks 52 python pool shared-memory multiprocessing

我有一个非常大(只读)的数据数组,我希望由多个进程并行处理.

我喜欢Pool.map函数,并希望用它来并行计算该数据的函数.

我看到可以使用Value或Array类在进程之间使用共享内存数据.但是当我尝试使用它时,我得到一个RuntimeError:'SynchronizedString对象只应在使用Pool.map函数时通过继承在进程之间共享:

这是我想要做的简化示例:

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  # this works
  print count_it( toShare, "a" )

  pool = Pool()

  # RuntimeError here
  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )
Run Code Online (Sandbox Code Playgroud)

谁能告诉我这里做错了什么?

所以我想要做的是将新创建的共享内存分配数组的信息传递给进程池中创建的进程.

rob*_*nce 42

我刚刚看到赏金,再次尝试;)

基本上我认为错误消息意味着它所说的 - 多处理共享内存数组不能作为参数传递(通过pickling).序列化数据没有意义 - 重点是数据是共享内存.所以你必须使共享数组全局化.我认为把它作为模块的属性更简洁,就像在我的第一个答案中一样,但是在你的例子中将它作为全局变量保留也很有效.考虑到你不希望在fork之前设置数据,这是一个修改过的例子.如果你想拥有多个可能的共享数组(这就是为什么你想传递toShare作为参数),你可以类似地创建共享数组的全局列表,并将索引传递给count_it(这将成为for c in toShare[i]:).

from sys import stdin
from multiprocessing import Pool, Array, Process

def count_it( key ):
  count = 0
  for c in toShare:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool()

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )
Run Code Online (Sandbox Code Playgroud)

[编辑:由于没有使用fork,上面的内容在Windows上不起作用.但是,下面的内容适用于Windows,仍然使用Pool,所以我认为这是最接近你想要的:

from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule

def count_it( key ):
  count = 0
  for c in mymodule.toShare:
    if c == key:
      count += 1
  return count

def initProcess(share):
  mymodule.toShare = share

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool(initializer=initProcess,initargs=(toShare,))

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )
Run Code Online (Sandbox Code Playgroud)

不知道为什么map不会Pickle数组但是Process和Pool会 - 我想也许它已经在windows上的子进程初始化时传输了.请注意,数据仍然在fork之后设置.


rob*_*nce 7

如果数据是只读的,只需在Pool 的 fork之前将其设为模块中的变量。然后所有子进程都应该能够访问它,如果您不写入它,它就不会被复制。

import myglobals # anything (empty .py file)
myglobals.data = []

def count_it( key ):
    count = 0
    for c in myglobals.data:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"

pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )
Run Code Online (Sandbox Code Playgroud)

如果您确实想尝试使用 Array,尽管您可以尝试使用lock=False关键字参数(默认情况下为 true)。


Acu*_*nus 6

如果你看到:

RuntimeError:同步对象只能通过继承在进程之间共享

考虑使用,multiprocessing.Manager因为它没有这个限制。经理的工作考虑到它可能完全在一个单独的进程中运行。

import ctypes
import multiprocessing

# Put this in a method or function, otherwise it will run on import from each module:
manager = multiprocessing.Manager()
counter = manager.Value(ctypes.c_ulonglong, 0)
counter_lock = manager.Lock()  # pylint: disable=no-member

with counter_lock:
    counter.value = count = counter.value + 1
Run Code Online (Sandbox Code Playgroud)


小智 5

我看到的问题是Pool不支持通过其参数列表来搜索共享数据.这就是错误消息所指的"对象应该只通过继承在进程之间共享".如果要使用Pool类共享共享数据,则需要继承共享数据,即全局共享数据.

如果需要显式传递它们,则可能必须使用multiprocessing.Process.这是你重写的例子:

from multiprocessing import Process, Array, Queue

def count_it( q, arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  q.put((key, count))

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  q = Queue()
  keys = ['a', 'b', 's', 'd']
  workers = [Process(target=count_it, args = (q, toShare, key))
    for key in keys]

  for p in workers:
    p.start()
  for p in workers:
    p.join()
  while not q.empty():
    print q.get(),
Run Code Online (Sandbox Code Playgroud)

输出:('s',9)('a',2)('b',3)('d',12)

队列元素的排序可能会有所不同.

为了使它更通用且与Pool类似,您可以创建固定的N个进程,将密钥列表拆分为N个部分,然后使用包装器函数作为Process目标,它将为列表中的每个键调用count_it它被传递,如:

def wrapper( q, arr, keys ):
  for k in keys:
    count_it(q, arr, k)
Run Code Online (Sandbox Code Playgroud)