在 Python 中并行运行多个任务的最佳方式是什么

Wil*_*iam 3 python multithreading python-multithreading python-asyncio python-multiprocessing

我有一个功能:

import time

def all_40k():
    for _ in range(400000):
        print('validate')
        print('parsing')
        print('inserting')
if __name__ == '__main__':
    start_time = time.time()
    all_40k()
    print(f'used time:{time.time()-start_time}')
Run Code Online (Sandbox Code Playgroud)

输出是:

used time:9.545064210891724
Run Code Online (Sandbox Code Playgroud)

因为这个相同的函数重复了 40k 次,所以我希望有 4 个并行函数同时运行,每个函数运行 10k,理想情况下这会快 4 倍。

所以我首先尝试了多线程:

import threading
import time
def first_10k():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


def second_10k():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')

def third_10k():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')

def forth_10k():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')

thread1 = threading.Thread(target=first_10k)
thread2 = threading.Thread(target=second_10k)
thread3 = threading.Thread(target=third_10k)
thread4 = threading.Thread(target=forth_10k)

thread1.start()
thread2.start()
thread3.start()
thread4.start()
if __name__ == '__main__':
    start_time = time.time()
    thread1.join()
    thread2.join()
    thread3.join()
    thread4.join()
    print(f'used time:{time.time()-start_time}')
Run Code Online (Sandbox Code Playgroud)

令我惊讶的是,输出是:

used time:23.058093309402466
Run Code Online (Sandbox Code Playgroud)

然后我尝试了 asyncio:

import time
import asyncio

async def test_1():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


async def test_2():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


async def test_3():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


async def test_4():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


async def multiple_tasks():
  input_coroutines = [test_1(), test_2(), test_3(),test_4()]
  res = await asyncio.gather(*input_coroutines, return_exceptions=True)
  return res

if __name__ == '__main__':
  start_time = time.time()
  res1, res2 ,res3,res4 = asyncio.get_event_loop().run_until_complete(multiple_tasks())
  print(f'used time:{time.time()-start_time}')
Run Code Online (Sandbox Code Playgroud)

输出是:

used time:9.295843601226807
Run Code Online (Sandbox Code Playgroud)

最后我尝试了 ProcessPoolExecutor:

import time
from concurrent.futures import ProcessPoolExecutor
def data_handler(urls):
    for i in range(urls[0], urls[1]):
        print('validate')
        print('parsing')
        print('inserting')

def run():
    urls = [(1,100000),(100001,200000),(2000001,300000),(300001,400000)]
    with ProcessPoolExecutor() as excute:
        excute.map(data_handler,urls)

if __name__ == '__main__':
    start_time = time.time()
    run()
    stop_time = time.time()
    print('used time %s' % (stop_time - start_time))
Run Code Online (Sandbox Code Playgroud)

输出是:

used time 12.726619243621826
Run Code Online (Sandbox Code Playgroud)

那么我怎样才能加快这个过程呢?我想我走错了路。有朋友可以帮忙吗?此致!

fla*_*kes 5

好的,所以你注意到了什么:

No parallelism   9.545064210891724
asyncio          9.295843601226807
multithreading   12.726619243621826
Thread Pool      23.058093309402466
Run Code Online (Sandbox Code Playgroud)

首先,Asyncio 实际上并不使用线程,如果您能猜到的话,性能依赖于一些 I/O。Asyncio 在循环中的任务之间交替,每当遇到await. 如果await不使用,它最终只会一次运行一个任务,而根本不会切换。

对于线程,由于全局解释器锁,只有一个线程能够控制 Python 解释。你在这里最终得到的是来自不同线程的一堆争用,所有线程都试图同时完成工作。这种上下文切换会减慢您的应用程序的速度。与 asyncio 类似,如果您想在等待某些 I/O 的同时安排其他工作,您实际上只会获得这些加速。

好的,现在肯定多处理案例应该运行得更快..对吗?好吧,每个进程都有它自己的解释器锁,但是,保留在您的print语句中。每个进程都在试图将其输出发送到同一个控制台管道时被阻止。让我用一个例子告诉你。

假设我们有一个方法要运行 4 次。一次串行和一次并行

def run(thread):
    print(f"Starting thread: {thread}")
    for i in range(500000):
        print('foobar')
    print(f"Finished thread: {thread}")


def run_singlethreaded():
    start_time = time.time()

    for thread in ["local"] * 4:
        run(thread)

    stop_time = time.time()
    return stop_time - start_time


def run_multiprocessing():
    start_time = time.time()

    with ProcessPoolExecutor(max_workers=4) as ex:
        ex.map(run, ["mp0", "mp1", "mp2", "mp3"])

    stop_time = time.time()
    return stop_time - start_time

if __name__ == '__main__':
    singlethreaded_time = run_singlethreaded()
    multiprocessing_time = run_multiprocessing()
    print(f"Finished singlethreaded in:  {singlethreaded_time}")
    print(f"Finished multiprocessing in: {multiprocessing_time}")
Run Code Online (Sandbox Code Playgroud)

如果我们运行它并打印时间,您会惊讶地看到:

Finished singlethreaded in:  10.513998746871948
Finished multiprocessing in: 12.252000570297241
Run Code Online (Sandbox Code Playgroud)

现在,如果我们将打印更改为更简单的内容,而不会导致 IO 瓶颈:

def run(thread):
    print(f"Starting thread: {thread}")
    for i in range(100000000):
        pass
    print(f"Finished thread: {thread}")
Run Code Online (Sandbox Code Playgroud)

您将获得预期的并行速度:

Finished singlethreaded in:  9.816999435424805
Finished multiprocessing in: 2.503000020980835
Run Code Online (Sandbox Code Playgroud)

这里重要的一点是,在并行性可以帮助您之前,您需要了解资源受限于何处。在 IO 绑定应用程序的情况下,线程或 asyncio 可能会有所帮助。在 CPU 密集型应用程序的情况下,多处理可能很有用。还有其他时候,两者都不会真正帮助您(如print声明),因为瓶颈存在于应用程序外部的系统中。希望这可以帮助!