如何将python asyncio与线程结合起来?

fxs*_*ein 33 python multithreading python-asyncio aiohttp

我已经使用Python asyncio和aiohttp 成功构建了一个RESTful微服务,它监听POST事件以从各种馈送器收集实时事件.

然后,它构建一个内存中结构,以便在嵌套的defaultdict/deque结构中缓存最后24h的事件.

现在我想定期检查该结构到光盘,最好使用泡菜.

由于内存结构可能> 100MB,我想避免在检查结构所需的时间内阻止我的传入事件处理.

我宁愿创建结构的快照拷贝(例如深度拷贝),然后花时间将其写入磁盘并在预设的时间间隔内重复.

我一直在寻找关于如何组合线程的例子(并且是一个线程甚至是最好的解决方案吗?)和asyncio用于那个目的但找不到能帮助我的东西.

任何开始使用的指针都非常感谢!

dan*_*ano 51

使用以下方法将方法委托给线程或子进程非常简单BaseEventLoop.run_in_executor:

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

def cpu_bound_operation(x):
    time.sleep(x) # This is some operation that is CPU-bound

@asyncio.coroutine
def main():
    # Run cpu_bound_operation in the ProcessPoolExecutor
    # This will make your coroutine block, but won't block
    # the event loop; other coroutines can run in meantime.
    yield from loop.run_in_executor(p, cpu_bound_operation, 5)


loop = asyncio.get_event_loop()
p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes
loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

至于是否使用ProcessPoolExecutor或者ThreadPoolExecutor,那是很难说的; 腌制一个大的物体肯定会吃掉一些CPU周期,最初你会认为这ProcessPoolExecutor是要走的路.但是,将100MB对象传递给Process池中的a将需要在主进程中对实例进行pickle,通过IPC将字节发送到子进程,在子进程中取消对其进行取消,然后再次对其进行pickle 以便将其写入磁盘.考虑到这一点,我的猜测是,腌制/去除开销会足够大,以至于你最好使用a ThreadPoolExecutor,即使你因GIL而受到性能影响.

也就是说,测试两种方式并确定无疑是非常简单的,所以你不妨这样做.


Lon*_*ami 7

loop.call_soon_threadsafe另一种选择是与 an 一起使用asyncio.Queue作为通信的中间渠道。

Python 3 的当前文档还有一个关于使用 asyncio 进行开发的部分 - 并发和多线程

import asyncio

# This method represents your blocking code
def blocking(loop, queue):
    import time
    while True:
        loop.call_soon_threadsafe(queue.put_nowait, 'Blocking A')
        time.sleep(2)
        loop.call_soon_threadsafe(queue.put_nowait, 'Blocking B')
        time.sleep(2)

# This method represents your async code
async def nonblocking(queue):
    await asyncio.sleep(1)
    while True:
        queue.put_nowait('Non-blocking A')
        await asyncio.sleep(2)
        queue.put_nowait('Non-blocking B')
        await asyncio.sleep(2)

# The main sets up the queue as the communication channel and synchronizes them
async def main():
    queue = asyncio.Queue()
    loop = asyncio.get_running_loop()

    blocking_fut = loop.run_in_executor(None, blocking, loop, queue)
    nonblocking_task = loop.create_task(nonblocking(queue))

    running = True  # use whatever exit condition
    while running:
        # Get messages from both blocking and non-blocking in parallel
        message = await queue.get()
        # You could send any messages, and do anything you want with them
        print(message)

asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

如何发送异步任务以在其他线程中循环运行也可能对您有帮助。

如果您需要更“强大”的示例,请查看我的Wrapper to launch asynctasks from threaded code。它将为您处理线程安全部分(大部分)并让您执行如下操作:

# See https://gist.github.com/Lonami/3f79ed774d2e0100ded5b171a47f2caf for the full example

async def async_main(queue):
    # your async code can go here
    while True:
        command = await queue.get()
        if command.id == 'print':
            print('Hello from async!')
        elif command.id == 'double':
            await queue.put(command.data * 2)

with LaunchAsync(async_main) as queue:
    # your threaded code can go here
    queue.put(Command('print'))
    queue.put(Command('double', 7))
    response = queue.get(timeout=1)
    print('The result of doubling 7 is', response)
Run Code Online (Sandbox Code Playgroud)


eni*_*ist 6

我也使用了run_in_executor,但是在大多数情况下,我发现这个函数有点麻烦,因为它需要partial()关键字args,并且我绝不会用任何执行器和默认事件循环来调用它。因此,我使用合理的默认值和自动关键字参数处理功能对其进行了方便包装。

from time import sleep
import asyncio as aio
loop = aio.get_event_loop()

class Executor:
    """In most cases, you can just use the 'execute' instance as a
    function, i.e. y = await execute(f, a, b, k=c) => run f(a, b, k=c) in
    the executor, assign result to y. The defaults can be changed, though,
    with your own instantiation of Executor, i.e. execute =
    Executor(nthreads=4)"""
    def __init__(self, loop=loop, nthreads=1):
        from concurrent.futures import ThreadPoolExecutor
        self._ex = ThreadPoolExecutor(nthreads)
        self._loop = loop
    def __call__(self, f, *args, **kw):
        from functools import partial
        return self._loop.run_in_executor(self._ex, partial(f, *args, **kw))
execute = Executor()

...

def cpu_bound_operation(t, alpha=30):
    sleep(t)
    return 20*alpha

async def main():
    y = await execute(cpu_bound_operation, 5, alpha=-2)

loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)