Python使用多进程来加速合并计数器

Sea*_*ean 4 python multiprocessing recommender-systems

我正在尝试使用一起购买的商品的次数来制作一个非常简单的商品推荐系统,

所以首先我创建了一个像 Counter 这样的 item2item 字典

# people purchased A with B 4 times, A with C 3 times.
item2item = {'A': {'B': 4, 'C': 3}, 'B': {'A': 4, 'C': 2}, 'C':{'A': 3, 'B': 2}}
# recommend user who purchased A and C
samples_list = [['A', 'C'], ...]    
Run Code Online (Sandbox Code Playgroud)

因此,对于samples = ['A', 'C'],我建议最多item2item['A'] + item2item['C']。

但是,合并对于大矩阵很重,所以我尝试使用多处理,如下所示

from operator import add
from functools import reduce
from concurrent.futures import ProcessPoolExecutor
from collections import Counter

with ProcessPoolExecutor(max_workers=10) as pool:
    for samples in samples_list:
        # w/o PoolExecutor
        # combined = reduce(add, [item2item[s] for s in samples], Counter())
        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
        combined = future.result()
Run Code Online (Sandbox Code Playgroud)

然而,这根本没有加快进程。

我怀疑 reduce 函数中的 Counter 没有按照Python 多处理和共享计数器以及https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes 中的回答进行共享

任何帮助表示赞赏。

Boo*_*boo 6

combined = future.result()在结果完成之前调用会阻塞,因此在前一个请求完成之前您不会向池提交后续请求。换句话说,您永远不会有多个子进程在运行。至少您应该将代码更改为:

with ProcessPoolExecutor(max_workers=10) as pool:
    the_futures = []
    for samples in tqdm(sample_list):
        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
        the_futures.append(future) # save it
    results = [f.result() for f in the_futures()] # all the results
Run Code Online (Sandbox Code Playgroud)

其它的办法:

with ProcessPoolExecutor(max_workers=10) as pool:
    the_futures = []
    for samples in tqdm(sample_list):
        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
        the_futures.append(future) # save it
    # you need: from concurrent.futures import as_completed
    for future in as_completed(the_futures): # not necessarily the order of submission
        result = future.result() # do something with this
Run Code Online (Sandbox Code Playgroud)

另外,如果你没有指定max_workersProcessPoolExecutor构造函数,则默认为您的机器上的处理器数量。指定一个大于您实际拥有的处理器数量的值没有任何好处。

更新

如果您想在结果完成后立即对其进行处理,并且需要一种将结果与原始请求联系起来的方法,您可以将期货作为键存储在字典中,其中相应的值代表请求的参数。在这种情况下:

with ProcessPoolExecutor(max_workers=10) as pool:
    the_futures = {}
    for samples in tqdm(sample_list):
        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
        the_futures[future] = samples # map future to request
    # you need: from concurrent.futures import as_completed
    for future in as_completed(the_futures): # not necessarily the order of submission
        samples = the_futures[future] # the request
        result = future.result() # the result
Run Code Online (Sandbox Code Playgroud)