如何将具有多个参数的函数传递给python concurrent.futures.ProcessPoolExecutor.map()?

Sun*_*ear 12 python concurrency lambda python-3.x concurrent.futures

我想concurrent.futures.ProcessPoolExecutor.map()调用一个由2个或更多参数组成的函数.在下面的示例中,我使用了一个lambda函数并将其定义refnumberlist具有相同值的相同大小的数组.

第一个问题:有更好的方法吗?在numberlist的大小可能是数百万到数十亿个元素的情况下,因此ref大小必须遵循numberlist,这种方法不必要地占用宝贵的内存,我想避免.我这样做是因为我读取map函数将终止其映射,直到达到最短的数组结束.

import concurrent.futures as cf

nmax = 10
numberlist = range(nmax)
ref = [5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
workers = 3


def _findmatch(listnumber, ref):    
    print('def _findmatch(listnumber, ref):')
    x=''
    listnumber=str(listnumber)
    ref = str(ref)
    print('listnumber = {0} and ref = {1}'.format(listnumber, ref))
    if ref in listnumber:
        x = listnumber
    print('x = {0}'.format(x))
    return x 

a = map(lambda x, y: _findmatch(x, y), numberlist, ref)
for n in a:
    print(n)
    if str(ref[0]) in n:
        print('match')

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    #for n in executor.map(_findmatch, numberlist):
    for n in executor.map(lambda x, y: _findmatch(x, ref), numberlist, ref):
        print(type(n))
        print(n)
        if str(ref[0]) in n:
            print('match')
Run Code Online (Sandbox Code Playgroud)

运行上面的代码,我发现该map功能能够实现我想要的结果.但是,当我将相同的术语转移到concurrent.futures.ProcessPoolExecutor.map()时,python3.5失败并出现此错误:

Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 241, in _feed
    obj = ForkingPickler.dumps(obj)
  File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x7fd2a14db0d0>: attribute lookup <lambda> on __main__ failed
Run Code Online (Sandbox Code Playgroud)

问题2:为什么会出现此错误?如何使用concurrent.futures.ProcessPoolExecutor.map()调用具有多个参数的函数?

Blc*_*ght 10

要首先回答你的第二个问题,你会得到一个异常,因为lambda你正在使用的函数是不可选择的.由于Python使用pickle协议来序列化主进程和ProcessPoolExecutor工作进程之间传递的数据,因此这是一个问题.目前尚不清楚你为什么要使用它lambda.你拥有的lambda有两个参数,就像原始函数一样.您可以_findmatch直接使用而不是lambda它应该工作.

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    for n in executor.map(_findmatch, numberlist, ref):
        ...
Run Code Online (Sandbox Code Playgroud)

关于传递第二个常量参数而不创建巨型列表的第一个问题,您可以通过多种方式解决这个问题.一种方法可能是用于itertools.repeat创建可迭代对象,该对象在迭代时永远重复相同的值.

但是更好的方法可能是编写一个额外的函数来为你传递常量参数.(也许这就是你尝试使用lambda函数的原因?)如果您使用的函数可以在模块的顶级命名空间访问,它应该可以工作:

def _helper(x):
    return _findmatch(x, 5)

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    for n in executor.map(_helper, numberlist):
        ...
Run Code Online (Sandbox Code Playgroud)


mko*_*vas 6

关于您的第一个问题,我是否正确理解您想传递一个参数,该参数的值仅在您调用时确定,map但对于映射函数的所有实例都是常量?如果是这样,我将map使用从“模板函数”派生的函数执行此操作,并使用以下方法将第二个参数(ref在您的示例中)烘焙到其中functools.partial

from functools import partial
refval = 5

def _findmatch(ref, listnumber):  # arguments swapped
    ...

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    for n in executor.map(partial(_findmatch, refval), numberlist):
        ...
Run Code Online (Sandbox Code Playgroud)

关于。问题 2,第一部分:我还没有找到试图对应该并行执行的函数进行pickle(序列化)的确切代码段,但是这听起来很自然——不仅是参数,还有该功能必须以某种方式转移给工作人员,并且可能必须为此转移进行序列化。partial函数可以被腌制而lambdas 不能被其他地方提到的事实,例如这里:https : //stackoverflow.com/a/19279016/6356764

关于。问题 2,第二部分:如果您想调用具有多个参数ProcessPoolExecutor.map的函数,您可以将函数作为第一个参数传递给它,然后是该函数的第一个参数的可迭代对象,然后是其第二个参数的可迭代对象等。在你的情况下:

for n in executor.map(_findmatch, numberlist, ref):
    ...
Run Code Online (Sandbox Code Playgroud)


tde*_*ney 6

(1)无需列出清单。您可以itertools.repeat用来创建仅重复某些值的迭代器。

(2)您需要将命名函数map传递给,因为它将被传递给子进程执行。map使用pickle协议发送东西,lambda不能被腌制,因此不能成为地图的一部分。但这完全没有必要。lambda所做的全部工作就是调用带有2个参数的2个参数函数。完全将其删除。

工作代码是

import concurrent.futures as cf
import itertools

nmax = 10
numberlist = range(nmax)
workers = 3

def _findmatch(listnumber, ref):    
    print('def _findmatch(listnumber, ref):')
    x=''
    listnumber=str(listnumber)
    ref = str(ref)
    print('listnumber = {0} and ref = {1}'.format(listnumber, ref))
    if ref in listnumber:
        x = listnumber
    print('x = {0}'.format(x))
    return x 

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    #for n in executor.map(_findmatch, numberlist):
    for n in executor.map(_findmatch, numberlist, itertools.repeat(5)):
        print(type(n))
        print(n)
        #if str(ref[0]) in n:
        #    print('match')
Run Code Online (Sandbox Code Playgroud)