PIL 和使用 asyncio 阻塞调用

use*_*803 4 python numpy blocking python-imaging-library python-asyncio

我有一个 asyncio 应用程序,它使用服务器 fromaiohttp和异步套接字asyncio.open_connection()

我的代码包含来自 PIL 库的一些阻塞调用,例如

Image.save()
Image.resize()
Run Code Online (Sandbox Code Playgroud)
  1. 即使调用不会阻塞太长时间,如果我使用这些阻塞调用,我的 Web 服务器是否会冻结?更准确地说,事件循环是否有可能因为阻塞代码而错过事件?
  2. 如果是,与 asyncio 集成的这些功能的替代品是什么?没有异步版本的 PIL。
  3. 一般来说,什么被认为是 asyncio 中的“阻塞代码”?除了像socket,读文件等明显的操作。
    例如,是否os.path.join()被认为可以?在numpy数组上工作怎么样?

Mik*_*mov 11

如果我使用这些阻塞调用,我的 Web 服务器会冻结吗?更准确地说,事件循环是否有可能因为阻塞代码而错过事件?

服务器将精确冻结执行图像功能的时间。您不会错过任何事件,但所有事件处理都会在图像函数执行时延迟。

冻结事件循环是一个糟糕的情况——你应该避免它。

如果是,与 asyncio 集成的这些功能的替代品是什么?没有异步版本的 PIL。

避免冻结事件循环的最简单和通用的方法 - 使用asyncio.run_in_executor在另一个线程或另一个进程中执行阻塞函数。那里的代码片段显示了如何做到这一点,并包含了何时使用进程或线程的很好的解释:

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))
Run Code Online (Sandbox Code Playgroud)

我只想补充一点,对于每个受 CPU 限制的操作,进程池可能并不总是好的解决方案。如果您的图像功能不需要太多时间(或者特别是如果您的服务器没有多个处理器核心),在一个线程中运行它们可能仍然更有效率。

一般来说,什么被认为是 asyncio 中的“阻塞代码”?除了socket,读取文件等明显的操作之外,例如os.path.join()是否被认为可以?在 numpy 数组上工作怎么样?

粗略地说,任何函数都是阻塞的:它会阻塞事件循环一段时间。但是很多功能都os.path.join需要很短的时间,所以它们不是问题,我们不称它们为“阻塞”。

当执行时间(和事件循环冻结)成为问题时,很难说确切的限制,特别是考虑到这个时间对于不同的硬件会有所不同。我有偏见的建议 - 如果您的代码在将控制权返回给事件循环之前需要(或可能需要)> 50 毫秒,请考虑将其阻塞并使用run_in_executor.

更新:

谢谢,使用一个事件循环(主线程的)并使用另一个将使用相同循环添加任务的线程有意义吗?

我不确定你在这里的意思,但我认为不是。我们需要另一个线程来运行一些作业,而不是在那里添加任务。

我需要某种方式让线程在图像处理完成后通知主线程`

只需等待结果run_in_executor或用它开始任务。run_in_executor- 是一个协程,它在后台线程中执行某事而不阻塞事件循环。

它看起来像这样:

thrad_pool = ThreadPoolExecutor()


def process_image(img):
    # all stuff to process image here
    img.save()
    img.resize()


async def async_image_process(img):
    await loop.run_in_executor(
        thread_pool, 
        partial(process_image, img)
    )


async def handler(request):

    asyncio.create_task(
        async_image_process(img)
    )
    # we use task to return response immediately,
    # read /sf/answers/2614189511/

    return web.Response(text="Image processed without blocking other requests")
Run Code Online (Sandbox Code Playgroud)