“GIL”如何影响带有 i/o 绑定任务的 Python asyncio `run_in_executor`?

adn*_*leb 3 python multithreading async-await python-asyncio

关于Python ayncio的代码示例 run_in_executor

import asyncio
import concurrent.futures

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(
        None, blocking_io)
    print('default thread pool', result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom process pool', result)

asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

该示例(在注释中)建议使用 运行 i/o 绑定函数ThreadPoolExecutor,并使用 .cpu 绑定函数ProcessPoolExecutor。我想用三个问题来验证我对这背后原因的理解:

  1. 这些建议并不是真正的建议,因为否则事件循环将会阻塞。因此,我们将失去事件编程的主要好处,对吗?

  2. 作为单独的线程运行 io/ 绑定任务,需要以下假设: i/o 调用将释放 GIL,对吗?因为除此之外,操作系统将无法在事件循环和这个新的单独线程之间进行上下文切换。

  3. 如果第 2 点的答案是肯定的,那么如何确定 i/o 调用是否释放了 GIL?

Mik*_*mov 8

这些建议并不是真正的建议,因为否则事件循环将会阻塞。因此,我们将失去事件编程的主要好处,对吗?

如果您在协程中调用阻塞(I/O 和 CPU 阻塞)函数而不等待执行器,事件循环将阻塞。就这一点而言,是的,你不应该允许这种情况发生。

我的建议是,它是每种类型的阻塞代码的执行器类型:对 CPU 绑定的东西使用 ProcessPoolExecutor,对 I/O 绑定的东西使用 ThreadPoolExecutor。

作为单独的线程运行 io/ 绑定任务,需要以下假设: i/o 调用将释放 GIL,对吗?因为除此之外,操作系统将无法在事件循环和这个新的单独线程之间进行上下文切换。

当涉及到多线程时,Python 将在很短的时间内在线程之间切换,而不会释放 GIL。但是,如果一个或多个线程有 I/O(或 C 代码),则 GIL 将被释放,从而允许解释器将更多时间花在需要它的线程上。

底线是:

  • 您可以在执行器中运行任何阻塞代码,它不会阻塞事件循环。您获得了并发性,但可能会也可能不会获得性能。
  • 例如,如果您在 ThreadPoolExecutor 中运行 CPU 密集型代码,则由于 GIL,您将不会从并发性中获得性能优势。为了获得 CPU 密集型内容的性能,您应该使用 ProcessPoolExecutor。
  • 但 I/O 限制可以在 ThreadPoolExecutor 中运行,并且您可以获得性能。这里没有必要使用更重的 ProcessPoolExecutor 。

我写了一个例子来演示它是如何工作的:

import sys
import asyncio
import time
import concurrent.futures
import requests
from contextlib import contextmanager

process_pool = concurrent.futures.ProcessPoolExecutor(2)
thread_pool = concurrent.futures.ThreadPoolExecutor(2)


def io_bound():
    for i in range(3):
        requests.get("https://httpbin.org/delay/0.4")  # I/O blocking
        print(f"I/O bound {i}")
        sys.stdout.flush()


def cpu_bound():
    for i in range(3):
        sum(i * i for i in range(10 ** 7))  # CPU blocking
        print(f"CPU bound {i}")
        sys.stdout.flush()


async def run_as_is(func):
    func()


async def run_in_process(func):
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(process_pool, func)


async def run_in_thread(func):
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(thread_pool, func)


@contextmanager
def print_time():
    start = time.time()
    yield
    finished = time.time() - start
    print(f"Finished in {round(finished, 1)}\n")


async def main():
    print("Wrong due to blocking code in coroutine,")
    print(
        "you get neither performance, nor concurrency (which breaks async nature of the code)"
    )
    print("don't allow this to happen")
    with print_time():
        await asyncio.gather(run_as_is(cpu_bound), run_as_is(io_bound))

    print("CPU bound works concurrently with threads,")
    print("but you gain no performance due to GIL")
    with print_time():
        await asyncio.gather(run_in_thread(cpu_bound), run_in_thread(cpu_bound))

    print("To get perfromance for CPU-bound,")
    print("use process executor")
    with print_time():
        await asyncio.gather(run_in_process(cpu_bound), run_in_process(cpu_bound))

    print("I/O bound will gain benefit from processes as well...")
    with print_time():
        await asyncio.gather(run_in_process(io_bound), run_in_process(io_bound))

    print(
        "... but there's no need in processes since you can use lighter threads for I/O"
    )
    with print_time():
        await asyncio.gather(run_in_thread(io_bound), run_in_thread(io_bound))

    print("Long story short,")
    print("Use processes for CPU bound due to GIL")
    print(
        "and use threads for I/O bound since you benefit from concurrency regardless of GIL"
    )
    with print_time():
        await asyncio.gather(run_in_thread(io_bound), run_in_process(cpu_bound))


if __name__ == "__main__":
    asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

输出:

Wrong due to blocking code in coroutine,
you get neither performance, nor concurrency (which breaks async nature of the code)
don't allow this to happen
CPU bound 0
CPU bound 1
CPU bound 2
I/O bound 0
I/O bound 1
I/O bound 2
Finished in 5.3

CPU bound works concurrently with threads,
but you gain no performance due to GIL
CPU bound 0
CPU bound 0
CPU bound 1
CPU bound 1
CPU bound 2
CPU bound 2
Finished in 4.6

To get perfromance for CPU-bound,
use process executor
CPU bound 0
CPU bound 0
CPU bound 1
CPU bound 1
CPU bound 2
CPU bound 2
Finished in 2.5

I/O bound will gain benefit from processes as well...
I/O bound 0
I/O bound 0
I/O bound 1
I/O bound 1
I/O bound 2
I/O bound 2
Finished in 3.3

... but there's no need in processes since you can use lighter threads for I/O
I/O bound 0
I/O bound 0
I/O bound 1
I/O bound 1
I/O bound 2
I/O bound 2
Finished in 3.1

Long story short,
Use processes for CPU bound due to GIL
and use threads for I/O bound since you benefit from concurrency regardless of GIL
CPU bound 0
I/O bound 0
CPU bound 1
I/O bound 1
CPU bound 2
I/O bound 2
Finished in 2.9
Run Code Online (Sandbox Code Playgroud)