use*_*244 9 python multithreading multiprocessing concurrent.futures
我正在使用并发.futures 模块来进行多处理和多线程处理。我在具有 16GB RAM、英特尔 i7 第八代处理器的 8 核机器上运行它。我在 Python 3.7.2 甚至 Python 3.8.2 上尝试过这个
import concurrent.futures
import time
Run Code Online (Sandbox Code Playgroud)
获取列表并将每个元素乘以 2
def double_value(x):
y = []
for elem in x:
y.append(2 *elem)
return y
Run Code Online (Sandbox Code Playgroud)
将 elem 乘以 2
def double_single_value(x):
return 2* x
Run Code Online (Sandbox Code Playgroud)
定义一个
import numpy as np
a = np.arange(100000000).reshape(100, 1000000)
Run Code Online (Sandbox Code Playgroud)
运行多个线程并将每个 elem 乘以 2 的函数
def get_double_value(x):
with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(double_single_value, x)
return list(results)
Run Code Online (Sandbox Code Playgroud)
t = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
my_results = executor.map(double_value, a)
print(time.time()-t)
Run Code Online (Sandbox Code Playgroud)
t = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
my_results = executor.map(get_double_value, a)
print(time.time()-t)
Run Code Online (Sandbox Code Playgroud)
我真的很想明白:
(我已经阅读了许多描述多处理和多线程的帖子,我得到的关键之一是多线程是针对 I/O 进程的,而多处理是针对 CPU 进程的?)
您可以将并发与并行混合在一起。 为什么?你可以有你的正当理由。想象一下您必须发出一堆请求,同时尽可能快地处理它们的响应(例如,将 XML 转换为 JSON)。
我做了一些测试,这是结果。在每次测试中,我混合不同的解决方法来打印 16000 次(我有 8 个核心和 16 个线程)。
multiprocessing, 并发性asyncio最快,1.1152372360229492秒。
import asyncio
import multiprocessing
import os
import psutil
import threading
import time
async def print_info(value):
await asyncio.sleep(1)
print(
f"THREAD: {threading.get_ident()}",
f"PROCESS: {os.getpid()}",
f"CORE_ID: {psutil.Process().cpu_num()}",
f"VALUE: {value}",
)
async def await_async_logic(values):
await asyncio.gather(
*(
print_info(value)
for value in values
)
)
def run_async_logic(values):
asyncio.run(await_async_logic(values))
def multiprocessing_executor():
start = time.time()
with multiprocessing.Pool() as multiprocessing_pool:
multiprocessing_pool.map(
run_async_logic,
(range(1000 * x, 1000 * (x + 1)) for x in range(os.cpu_count())),
)
end = time.time()
print(end - start)
multiprocessing_executor()
Run Code Online (Sandbox Code Playgroud)
非常重要的提示:有了asyncio我,我可以随心所欲地发送垃圾邮件任务。例如,我可以将值从 更改1000为10000生成 160000 个打印件,并且没有问题(我测试了它,它花了我 2.0210490226745605 秒)。
multiprocessing, 并发性threading另一种选择是 1.6983509063720703 秒。
import multiprocessing
import os
import psutil
import threading
import time
def print_info(value):
time.sleep(1)
print(
f"THREAD: {threading.get_ident()}",
f"PROCESS: {os.getpid()}",
f"CORE_ID: {psutil.Process().cpu_num()}",
f"VALUE: {value}",
)
def multithreading_logic(values):
threads = []
for value in values:
threads.append(threading.Thread(target=print_info, args=(value,)))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
def multiprocessing_executor():
start = time.time()
with multiprocessing.Pool() as multiprocessing_pool:
multiprocessing_pool.map(
multithreading_logic,
(range(1000 * x, 1000 * (x + 1)) for x in range(os.cpu_count())),
)
end = time.time()
print(end - start)
multiprocessing_executor()
Run Code Online (Sandbox Code Playgroud)
非常重要的注意事项:使用这种方法,我不能像我想要的那样发送垃圾邮件。如果我将值从 更改为 ,1000我10000会得到RuntimeError: can't start new thread。我还想说,我印象深刻,因为我认为这种方法在各个方面都会比 asyncio 更好,但事实恰恰相反。
concurrent.futures非常慢,50.08251595497131 秒。
import os
import psutil
import threading
import time
from concurrent.futures import thread, process
def print_info(value):
time.sleep(1)
print(
f"THREAD: {threading.get_ident()}",
f"PROCESS: {os.getpid()}",
f"CORE_ID: {psutil.Process().cpu_num()}",
f"VALUE: {value}",
)
def multithreading_logic(values):
with thread.ThreadPoolExecutor() as multithreading_executor:
multithreading_executor.map(
print_info,
values,
)
def multiprocessing_executor():
start = time.time()
with process.ProcessPoolExecutor() as multiprocessing_executor:
multiprocessing_executor.map(
multithreading_logic,
(range(1000 * x, 1000 * (x + 1)) for x in range(os.cpu_count())),
)
end = time.time()
print(end - start)
multiprocessing_executor()
Run Code Online (Sandbox Code Playgroud)
非常重要的注意事项:使用此方法,与使用 一样asyncio,我可以根据需要发送任意数量的垃圾邮件。例如,我可以将值从 更改1000为10000生成 160000 个打印件,并且没有问题(时间除外)。
为了发表此评论,我修改了测试,使其仅进行 1600 次打印(修改每个测试中的1000值100)。
当我从 asyncio 中删除并行性时,执行需要 16.090194702148438 秒。另外,如果我用 替换await asyncio.sleep(1),time.sleep(1)则需要 160.1889989376068 秒。
从多线程选项中删除并行性,执行需要 16.24941658973694 秒。现在我印象深刻。没有多处理的多线程给我带来了良好的性能,与 asyncio 非常相似。
从第三个选项中删除并行性,执行时间为 80.15227723121643 秒。
正如您所说:“我已经阅读了许多描述多处理和多线程的帖子,我得到的关键之一是多线程是针对 I/O 进程的,而多处理是针对 CPU 进程的”。
您需要弄清楚,您的程序是 IO 密集型还是 CPU 密集型,然后应用正确的方法来解决您的问题。随机或同时应用各种方法通常会让事情变得更糟。
| 归档时间: |
|
| 查看次数: |
8660 次 |
| 最近记录: |