Python 中的多处理中的多线程

use*_*244 9 python multithreading multiprocessing concurrent.futures

我正在使用并发.futures 模块来进行多处理和多线程处理。我在具有 16GB RAM、英特尔 i7 第八代处理器的 8 核机器上运行它。我在 Python 3.7.2 甚至 Python 3.8.2 上尝试过这个

import concurrent.futures
import time
Run Code Online (Sandbox Code Playgroud) 获取列表并将每个元素乘以 2
def double_value(x):
  y = []
  for elem in x:
    y.append(2 *elem)
  return y
Run Code Online (Sandbox Code Playgroud) 将 elem 乘以 2
def double_single_value(x):
  return 2* x
Run Code Online (Sandbox Code Playgroud) 定义一个
import numpy as np
a = np.arange(100000000).reshape(100, 1000000)
Run Code Online (Sandbox Code Playgroud) 运行多个线程并将每个 elem 乘以 2 的函数
 def get_double_value(x):
  with concurrent.futures.ThreadPoolExecutor() as executor:
    results = executor.map(double_single_value, x)
  return list(results)
Run Code Online (Sandbox Code Playgroud)

下面显示的代码运行时间为 115 秒。这仅使用多处理。这段代码的CPU利用率是100%

t = time.time()

with concurrent.futures.ProcessPoolExecutor() as executor:
  my_results = executor.map(double_value, a)
print(time.time()-t)
Run Code Online (Sandbox Code Playgroud)

下面的函数花费了超过 9 分钟并消耗了系统的所有 RAM,然后系统杀死了所有进程。此外,这段代码期间的 CPU 利用率并未达到 100%(~85%)

t = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
  my_results = executor.map(get_double_value, a)

print(time.time()-t)
Run Code Online (Sandbox Code Playgroud)

我真的很想明白:

1)为什么首先拆分进行多个处理然后运行尝试多线程的代码运行速度并不比仅运行多处理的代码运行得更快?

(我已经阅读了许多描述多处理和多线程的帖子,我得到的关键之一是多线程是针对 I/O 进程的,而多处理是针对 CPU 进程的?)

2)是否有更好的方法在多处理中进行多线程处理,以最大程度地利用分配的核心(或CPU)?

3)为什么最后一段代码消耗了所有的RAM?是因为多线程吗?

Luc*_*uez 5

您可以将并发与并行混合在一起。 为什么?你可以有你的正当理由。想象一下您必须发出一堆请求,同时尽可能快地处理它们的响应(例如,将 XML 转换为 JSON)。

我做了一些测试,这是结果。在每次测试中,我混合不同的解决方法来打印 16000 次(我有 8 个核心和 16 个线程)。

并行性multiprocessing, 并发性asyncio

最快,1.1152372360229492秒。

import asyncio
import multiprocessing
import os
import psutil
import threading
import time

async def print_info(value):
    await asyncio.sleep(1)
    print(
        f"THREAD: {threading.get_ident()}",
        f"PROCESS: {os.getpid()}",
        f"CORE_ID: {psutil.Process().cpu_num()}",
        f"VALUE: {value}",
    )

async def await_async_logic(values):
    await asyncio.gather(
        *(
            print_info(value)
            for value in values
        )
    )

def run_async_logic(values):
    asyncio.run(await_async_logic(values))

def multiprocessing_executor():
    start = time.time()
    with multiprocessing.Pool() as multiprocessing_pool:
        multiprocessing_pool.map(
            run_async_logic,
            (range(1000 * x, 1000 * (x + 1)) for x in range(os.cpu_count())),
        )
    end = time.time()
    print(end - start)

multiprocessing_executor()
Run Code Online (Sandbox Code Playgroud)

非常重要的提示:有了asyncio我,我可以随心所欲地发送垃圾邮件任务。例如,我可以将值从 更改100010000生成 160000 个打印件,并且没有问题(我测试了它,它花了我 2.0210490226745605 秒)。

并行性multiprocessing, 并发性threading

另一种选择是 1.6983509063720703 秒。

import multiprocessing
import os
import psutil
import threading
import time

def print_info(value):
    time.sleep(1)
    print(
        f"THREAD: {threading.get_ident()}",
        f"PROCESS: {os.getpid()}",
        f"CORE_ID: {psutil.Process().cpu_num()}",
        f"VALUE: {value}",
    )

def multithreading_logic(values):
    threads = []
    for value in values:
        threads.append(threading.Thread(target=print_info, args=(value,)))
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

def multiprocessing_executor():
    start = time.time()
    with multiprocessing.Pool() as multiprocessing_pool:
        multiprocessing_pool.map(
            multithreading_logic,
            (range(1000 * x, 1000 * (x + 1)) for x in range(os.cpu_count())),
        )
    end = time.time()
    print(end - start)

multiprocessing_executor()
Run Code Online (Sandbox Code Playgroud)

非常重要的注意事项:使用这种方法,我不能像我想要的那样发送垃圾邮件。如果我将值从 更改为 ,100010000会得到RuntimeError: can't start new thread。我还想说,我印象深刻,因为我认为这种方法在各个方面都会比 asyncio 更好,但事实恰恰相反。

并行性和并发性concurrent.futures

非常慢,50.08251595497131 秒。

import os
import psutil
import threading
import time
from concurrent.futures import thread, process

def print_info(value):
    time.sleep(1)
    print(
        f"THREAD: {threading.get_ident()}",
        f"PROCESS: {os.getpid()}",
        f"CORE_ID: {psutil.Process().cpu_num()}",
        f"VALUE: {value}",
    )

def multithreading_logic(values):
    with thread.ThreadPoolExecutor() as multithreading_executor:
        multithreading_executor.map(
            print_info,
            values,
        )

def multiprocessing_executor():
    start = time.time()
    with process.ProcessPoolExecutor() as multiprocessing_executor:
        multiprocessing_executor.map(
            multithreading_logic,
            (range(1000 * x, 1000 * (x + 1)) for x in range(os.cpu_count())),
        )
    end = time.time()
    print(end - start)

multiprocessing_executor()
Run Code Online (Sandbox Code Playgroud)

非常重要的注意事项:使用此方法,与使用 一样asyncio,我可以根据需要发送任意数量的垃圾邮件。例如,我可以将值从 更改100010000生成 160000 个打印件,并且没有问题(时间除外)。

额外说明

为了发表此评论,我修改了测试,使其仅进行 1600 次打印(修改每个测试中的1000100)。

当我从 asyncio 中删除并行性时,执行需要 16.090194702148438 秒。另外,如果我用 替换await asyncio.sleep(1)time.sleep(1)则需要 160.1889989376068 秒。

从多线程选项中删除并行性,执行需要 16.24941658973694 秒。现在我印象深刻。没有多处理的多线程给我带来了良好的性能,与 asyncio 非常相似。

从第三个选项中删除并行性,执行时间为 80.15227723121643 秒。


len*_*nik 3

正如您所说:“我已经阅读了许多描述多处理和多线程的帖子,我得到的关键之一是多线程是针对 I/O 进程的,而多处理是针对 CPU 进程的”。

您需要弄清楚,您的程序是 IO 密集型还是 CPU 密集型,然后应用正确的方法来解决您的问题。随机或同时应用各种方法通常会让事情变得更糟。