python多处理初始化的开销比好处更糟糕

Den*_*nis 5 python multiprocessing

我想在 python 3.7 中使用 trie 搜索,以便将字符串与某些给定的单词匹配。trie 搜索算法实际上非常快,但是我也想使用我的 CPU 的所有内核。假设我的电脑有 8 个内核,我想使用其中的 7 个。因此,我将我的单词数据库拆分为 7 个同样大的列表,并创建了每个列表。(这是并行化代码的基本思想)

但是,当我从多处理模块调用 Process() 时,Process().start() 方法可能会在真实数据库上占用几秒钟的时间。(搜索本身大约需要一微秒)。

老实说,我还不是一个专业的程序员,这意味着我可能在代码中犯了一些重大错误。有人看到这个过程的开始如此缓慢的原因吗?

请考虑到我使用比下面的 trie 更大的数据库测试了脚本。我还测试了每次只调用 1 个进程的脚本,这也明显变慢了。我想提供更少的代码,但是我认为很高兴看到运行问题。如果需要,我还可以提供其他信息。

import string
import sys
import time

from multiprocessing import Process, Manager
from itertools import combinations_with_replacement


class TrieNode:

    def __init__(self):

        self.isString = False
        self.children = {}

    def insertString(self, word, root):
        currentNode = root
        for char in word:
            if char not in currentNode.children:
                currentNode.children[char] = TrieNode()
            currentNode = currentNode.children[char]
        currentNode.isString = True

    def findStrings(self, prefix, node, results):
        # Hänge das Ergebnis an, wenn ein Ende gefunden wurde
        if node.isString:
            results.append(prefix)

        for char in node.children:
            self.findStrings(prefix + char, node.children[char], results)

    def findSubStrings(self, start_prefix, root, results):
        currentNode = root
        for char in start_prefix:

            # Beende Schleife auf fehlende Prefixes oder deren Kinder
            if char not in currentNode.children:
                break
            # Wechsle zu Kindern in anderem Falle
            else:
                currentNode = currentNode.children[char]

        # Verwende findStrings Rekursiv zum auffinden von End-Knoten
        self.findStrings(start_prefix, currentNode, results)

        return results


def gen_word_list(num_words, min_word_len=4, max_word_len=10):
    wordList = []
    total_words = 0
    for long_word in combinations_with_replacement(string.ascii_lowercase, max_word_len):
        wordList.append(long_word)
        total_words += 1

        if total_words >= num_words:
            break

        for cut_length in range(1, max_word_len-min_word_len+1):
            wordList.append(long_word[:-cut_length])
            total_words += 1
            if total_words >= num_words:
                break

    return wordList


if __name__ == '__main__':
    # Sample word list

    wordList = gen_word_list(1.5 * 10**5)

    # Configs
    try:
        n_cores = int(sys.argv[-1] or 7)
    except ValueError:
        n_cores = 7

    # Repetitions to do in order to estimate the runtime of a single run
    num_repeats = 20
    real_num_repeats = n_cores * num_repeats

    # Creating Trie
    root = TrieNode()

    # Adding words
    for word in wordList:
        root.insertString(word, root)

    # Extending trie to use it on multiple cores at once
    multiroot = [root] * n_cores

    # Measure time
    print('Single process ...')
    t_0 = time.time()
    for i in range(real_num_repeats):
        r = []
        root.findSubStrings('he', root, r)
    single_proc_time = (time.time()-t_0)
    print(single_proc_time/real_num_repeats)

    # using multicore to speed up the process
    man = Manager()

    # Loop to test the multicore Solution
    # (Less repetitions are done to compare the timings to the single-core solution)
    print('\nMultiprocess ...')
    t_00 = time.time()
    p_init_time = 0
    procs_append_time = 0
    p_start_time = 0
    for i in range(num_repeats):

        # Create Share-able list
        res = man.list()

        procs = []

        for i in range(n_cores):
            t_0 = time.time()
            p = Process(target=multiroot[i].findSubStrings, args=('a', multiroot[i], res))
            t_1 = time.time()
            p_init_time += t_1 - t_0
            procs.append(p)
            t_2 = time.time()
            procs_append_time += t_2 - t_1
            p.start()
            p_start_time += time.time() - t_2

        for p in procs:
            p.join()
    multi_proc_time = time.time() - t_00
    print(multi_proc_time / real_num_repeats)
    init_overhead = p_init_time / single_proc_time
    append_overhead = procs_append_time / single_proc_time
    start_overhead = p_start_time / single_proc_time
    total_overhead = (multi_proc_time - single_proc_time) / single_proc_time
    print(f"Process(...) overhead: {init_overhead:.1%}")
    print(f"procs.append(p) overhead: {append_overhead:.1%}")
    print(f"p.start() overhead: {start_overhead:.1%}")
    print(f"Total overhead: {total_overhead:.1%}")


Run Code Online (Sandbox Code Playgroud)
Single process ...
0.007229958261762347

Multiprocess ...
0.7615800397736686
Process(...) overhead: 0.9%
procs.append(p) overhead: 0.0%
p.start() overhead: 8.2%
Total overhead: 10573.8%

Run Code Online (Sandbox Code Playgroud)

imp*_*ren 6

大概的概念

有很多事情需要考虑,其中大部分已经在Multiprocessing > Programming Guidelines 中进行了描述。最重要的是要记住,您实际上正在处理多个进程,因此有 3(或 4)种处理变量的方法:

  • ctypes 共享状态变量上的同步包装器(如 multiprocessing.Value)。实际变量在内存中始终是“一个对象”,默认情况下包装器使用“锁定”来设置/获取实际值。

  • 代理(如Manager().list())。这些变量类似于共享状态变量,但被放置在特殊的“服务器进程”中,对它们的所有操作实际上都是在管理器进程和活动进程之间发送腌制值:

    • results.append(x)腌制x并将其从管理器进程发送到进行此调用的活动进程,然后将其取消腌制

    • 任何其他访问results(例如len(results),对结果的迭代)都涉及相同的酸洗/发送/取消酸洗过程。

    因此,对于公共变量,代理通常比任何其他方法慢得多,并且在许多情况下,即使与单进程运行相比,使用管理器进行“本地”并行化也会产生更差的性能。但是管理服务器可以远程使用,所以当你想使用分布在多台机器上的工作人员并行化工作时使用它们是合理的

  • 在子流程创建期间可用的对象。对于“fork”启动方法,在创建子进程期间所有可用的对象仍然可用并且“不共享”,因此更改它们只会“在子进程本地”更改它。但是在它们被改变之前,每个进程真正“共享”每个这样的对象的内存,所以:

    • 如果它们被“只读”使用,则不会复制或“通信”任何内容。

    • 如果它们被更改,那么它们将被复制到子流程中,并且副本正在被更改。这称为写时复制或 COW。请注意,对对象进行新的引用,例如分配一个变量来引用它,或将其附加到列表中会增加对象的 ref_count,这被认为是“更改”。

行为也可能因“启动方法”而异:例如,对于“spawn”/“forkserver”方法,可更改的全局变量并不是真正的“相同对象”,子进程看到的值可能与父进程中的值不同。

因此multiroot[i](在 中使用Process(target=..., args=(..., multiroot[i], ...)))的初始值是共享的,但是:

  • 如果您没有使用 'fork' start 方法(并且默认情况下 Windows 没有使用它),那么每个子进程的所有参数至少会被腌制一次。所以start如果multiroot[i].children很大,可能需要很长时间。

  • 即使您正在使用 fork: 最初multiroot[i]似乎是共享的而不是复制的,但是我不确定在findSubStrings方法内部分配变量时会发生什么(例如currentNode = ...) - 也许它会导致写时复制(COW)等整个实例的TrieNode正在被复制。

可以做些什么来改善这种情况:

  • 如果您正在使用forkstart 方法,那么请确保“数据库”对象(TrieNode实例)是真正只读的,并且没有在其中包含变量赋值的方法。例如,您可以移动findSubStrings到另一个类,并确保instance.insertString在启动子进程之前调用所有。

  • 您正在使用man.list()实例作为results参数findSubStrings。这意味着为每个子流程创建了一个不同的“包装器”,并且所有results.append(prefix)操作都是 pickling prefix,然后将其发送到服务器进程。如果您使用Pool的进程数量有限,那么这没什么大不了的。如果您生成大量子进程,则可能会影响性能。而且我认为默认情况下它们都使用“锁定”,因此并发追加可能相对较慢。如果项目的顺序results无关紧要(我对前缀树没有经验,也不记得它背后的理论),那么您可以完全避免与并发相关的任何开销 results.append

    • resultsfindSubStrings方法内创建新列表。根本不要用res = man.list()
    • 要获得“最终”结果:迭代pool.apply_async())返回的每个结果对象;得到结果;“合并它们”。

使用弱引用

currentNode = root在 findSubStrings 中使用将导致root. 这就是为什么弱引用 ( currentNodeRef = weakref.ref(root)) 可以带来一些额外的好处。

例子

import string
import sys
import time
import weakref

from copy import deepcopy
from multiprocessing import Pool
from itertools import combinations_with_replacement


class TrieNode:

    def __init__(self):

        self.isString = False
        self.children = {}

    def insertString(self, word, root):
        current_node = root
        for char in word:
            if char not in current_node.children:
                current_node.children[char] = TrieNode()
            current_node = current_node.children[char]
        current_node.isString = True


# findStrings: not a method of TrieNode anymore, and works with reference to node.
def findStrings(prefix, node_ref, results):
    # Hänge das Ergebnis an, wenn ein Ende gefunden wurde
    if node_ref().isString:
        results.append(prefix)

    for char in node_ref().children:
        findStrings(prefix + char, weakref.ref(node_ref().children[char]), results)


# findSubStrings: not a method of TrieNode anymore, and works with reference to node.
def findSubStrings(start_prefix, node_ref, results=None):

    if results is None:
        results = []
    current_node_ref = node_ref

    for char in start_prefix:

        # Beende Schleife auf fehlende Prefixes oder deren Kinder
        if char not in current_node_ref().children:
            break
        # Wechsle zu Kindern in anderem Falle
        else:
            current_node_ref = weakref.ref(current_node_ref().children[char])

    # Verwende findStrings Rekursiv zum auffinden von End-Knoten
    findStrings(start_prefix, current_node_ref, results)

    return results


def gen_word_list(num_words, min_word_len=4, max_word_len=10):
    wordList = []
    total_words = 0
    for long_word in combinations_with_replacement(string.ascii_lowercase, max_word_len):
        wordList.append(long_word)
        total_words += 1

        if total_words >= num_words:
            break

        for cut_length in range(1, max_word_len-min_word_len+1):
            wordList.append(long_word[:-cut_length])
            total_words += 1
            if total_words >= num_words:
                break

    return wordList


if __name__ == '__main__':
    # Sample word list

    wordList = gen_word_list(1.5 * 10**5)

    # Configs
    try:
        n_cores = int(sys.argv[-1] or 7)
    except ValueError:
        n_cores = 7

    # Repetitions to do in order to estimate the runtime of a single run
    real_num_repeats = 420
    simulated_num_repeats = real_num_repeats // n_cores

    # Creating Trie
    root = TrieNode()

    # Adding words
    for word in wordList:
        root.insertString(word, root)

    # Create tries for subprocesses:
    multiroot = [deepcopy(root) for _ in range(n_cores)]
    # NOTE: actually all subprocesses can use the same `root`, but let's copy them to simulate
    # that we are using different tries when splitting job to sub-jobs

    # localFindSubStrings: defined after `multiroot`, so `multiroot` can be used as "shared" variable
    def localFindSubStrings(start_prefix, root_index=None, results=None):
        if root_index is None:
            root_ref = weakref.ref(root)
        else:
            root_ref = weakref.ref(multiroot[root_index])
        return findSubStrings(start_prefix, root_ref, results)

    # Measure time
    print('Single process ...')
    single_proc_num_results = None
    t_0 = time.time()
    for i in range(real_num_repeats):
        iteration_results = localFindSubStrings('help', )
        if single_proc_num_results is None:
            single_proc_num_results = len(iteration_results)
    single_proc_time = (time.time()-t_0)
    print(single_proc_time/real_num_repeats)

    # Loop to test the multicore Solution
    # (Less repetitions are done to compare the timings to the single-core solution)
    print('\nMultiprocess ...')
    p_init_time = 0
    apply_async_time = 0
    results_join_time = 0

    # Should processes be joined between repeats (simulate single job on multiple cores) or not (simulate multiple jobs running simultaneously)
    PARALLEL_REPEATS = True

    if PARALLEL_REPEATS:
        t_0 = time.time()
        pool = Pool(processes=n_cores)
        t_1 = time.time()
        p_init_time += t_1 - t_0
        async_results = []

    final_results = []

    t_00 = time.time()

    for repeat_num in range(simulated_num_repeats):

        final_result = []
        final_results.append(final_result)

        if not PARALLEL_REPEATS:
            t_0 = time.time()
            pool = Pool(processes=n_cores)
            t_1 = time.time()
            p_init_time += t_1 - t_0
            async_results = []
        else:
            t_1 = time.time()

        async_results.append(
            (
                final_result,
                pool.starmap_async(
                    localFindSubStrings,
                    [('help', core_num) for core_num in range(n_cores)],
                )
            )
        )
        t_2 = time.time()
        apply_async_time += t_2 - t_1

        if not PARALLEL_REPEATS:
            for _, a_res in async_results:
                for result_part in a_res.get():
                    t_3 = time.time()
                    final_result.extend(result_part)
                    results_join_time += time.time() - t_3
            pool.close()
            pool.join()

    if PARALLEL_REPEATS:
        for final_result, a_res in async_results:
            for result_part in a_res.get():
                t_3 = time.time()
                final_result.extend(result_part)
                results_join_time += time.time() - t_3
        pool.close()
        pool.join()

    multi_proc_time = time.time() - t_00

    # Work is not really parallelized, instead it's just 'duplicated' over cores,
    # and so we divide using `real_num_repeats` (not `simulated_num_repeats`)
    print(multi_proc_time / real_num_repeats)
    init_overhead = p_init_time / single_proc_time
    apply_async_overhead = apply_async_time / single_proc_time
    results_join_percent = results_join_time / single_proc_time
    total_overhead = (multi_proc_time - single_proc_time) / single_proc_time
    print(f"Pool(...) overhead: {init_overhead:.1%}")
    print(f"pool.starmap_async(...) overhead: {apply_async_overhead:.1%}")
    print(f"Results join time percent: {results_join_percent:.1%}")
    print(f"Total overhead: {total_overhead:.1%}")

    for iteration_results in final_results:
        num_results = len(iteration_results) / n_cores
        if num_results != single_proc_num_results:
            raise AssertionError(f'length of results should not change! {num_results} != {single_proc_num_results}')

Run Code Online (Sandbox Code Playgroud)

笔记:

  • PARALLEL_REPEATS=True 模拟多个作业的运行(例如,每个作业应该针对不同的前缀启动,但在示例中,我使用相同的前缀来为每次运行提供一致的“负载”),并且每个作业在所有内核上都是“并行化的”。
  • PARALLEL_REPEATS=False 模拟在所有内核上并行运行的单个作业,它比单进程解决方案慢。
  • 似乎只有当池中的每个工人发出apply_async超过 1 次时,并行性才会更好。

示例输出:

Single process ...
0.007109369550432477

Multiprocess ...
0.002928720201764788
Pool(...) overhead: 1.3%
pool.apply_async(...) overhead: 1.5%
Results join time percent: 1.8%
Total overhead: -58.8%

Run Code Online (Sandbox Code Playgroud)


Den*_*nis 2

首先,我要感谢所有参与的人,因为每个答案都有助于解决问题。

正如第一条评论指出的那样,每次创建一个新进程都会导致 python 将所需的数据转移到该进程中。这可能需要几秒钟的时间,并会导致不希望的延迟。

为我带来最终解决方案的是在程序启动期间使用多处理库的 Process 类创建一次进程(每个核心一个)。

然后,您可以使用同一模块的 Pipe 类与进程进行通信。

我发现这里的乒乓球示例确实很有帮助:https://www.youtube.com/watch ?v=s1SkCYMnfbY&t=900s

它仍然不是最佳的,因为多个管道在同一时间尝试与进程通信会导致进程崩溃。

但是,我应该能够使用队列解决这个问题。如果有人对解决方案感兴趣,请随时询问。