线程未与 ThreadPoolExecutor 并行执行 python

Lea*_* D. 4 python concurrency machine-learning python-multithreading

我是 python 线程的新手,我正在试验这个:当我在线程中运行某些东西时(每当我打印输出时),它似乎永远不会并行运行。此外,我的函数花费的时间与使用库 concurrent.futures (ThreadPoolExecutor) 之前相同。我必须计算数据集上某些属性的增益(我不能使用库)。由于我有大约 1024 个属性并且该函数需要大约一分钟来执行(并且我必须在迭代中使用它)我决定将 的数组attributes分成 10 个(仅作为示例)并gain(attribute)单独运行 separete 函数每个子数组。所以我做了以下(避免一些额外的不必要的代码):

def calculate_gains(self):
    splited_attributes = np.array_split(self.attributes, 10)
    result = {}
    for atts in splited_attributes:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future = executor.submit(self.calculate_gains_helper, atts)
            return_value = future.result()
            self.gains = {**self.gains, **return_value}
Run Code Online (Sandbox Code Playgroud)

这是calculate_gains_helper:

def calculate_gains_helper(self, attributes):
    inter_result = {}
    for attribute in attributes:
        inter_result[attribute] = self.gain(attribute)
    return inter_result
Run Code Online (Sandbox Code Playgroud)

难道我做错了什么?我阅读了其他一些较旧的帖子,但无法获得任何信息。非常感谢您的帮助!

mic*_*del 6

由于GIL, Python 线程不会并行运行(至少在 CPython 实现中)。使用进程和ProcessPoolExecutor真正具有并行性

with concurrent.futures.ProcessPoolExecutor() as executor:
    ...
Run Code Online (Sandbox Code Playgroud)


小智 1

我遇到了同样的问题,并通过将迭代移动到 ThreadPoolExecutor 的上下文中来修复,否则,您将不得不等待上下文完成并启动另一个迭代。

以下是您的代码的可能修复:

def calculate_gains(self):
    splited_attributes = np.array_split(self.attributes, 10)
    result = {}
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for atts in splited_attributes:

            future = executor.submit(self.calculate_gains_helper, atts)
            return_value = future.result()
            self.gains = {**self.gains, **return_value}
Run Code Online (Sandbox Code Playgroud)

为了更好地演示我的意思,这里有一个示例代码:

下面是一个非工作代码。线程将同步执行...

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep

def t(reference):
    i = 0
    for i in range(10):
        print(f"{reference} :" + str(i))
        i+=1
        sleep(1)

futures = []
refs = ["a", "b", "c"]

for i in refs:
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures.append(executor.submit(t, i))
    
    for future in as_completed(futures):
        print(future.result())
Run Code Online (Sandbox Code Playgroud)

这是固定代码:

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep

def t(reference):
    i = 0
    for i in range(10):
        print(f"{reference} :" + str(i))
        i+=1
        sleep(1)

futures = []
refs = ["a", "b", "c"]

with ThreadPoolExecutor(max_workers=3) as executor:  #swapped
    for i in refs:                                   #swapped
        futures.append(executor.submit(t, i))
    
    for future in as_completed(futures):
        print(future.result())
Run Code Online (Sandbox Code Playgroud)

您可以在终端上尝试此操作并检查输出。