Python:将上下文(contextvars.Context)复制到单独的线程

Vla*_*ich 2 python python-multithreading python-contextvars

就目前而言,我已经找到了很多关于 contextvars 模块如何与 asyncio 一起运行的示例,但没有一个关于如何与线程一起运行的示例(asyncio.get_event_loop().run_in_executor、threading.Thread 等)。

我的问题是,如何将上下文传递给单独的线程?下面您可以看到一个不起作用的代码片段(python 3.9.8)。

import typing
import asyncio
import contextvars
import concurrent.futures


class CustomThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
    def submit(
        self,
        function: typing.Callable,
        *args,
        **kwargs
    ) -> concurrent.futures.Future:
        context = contextvars.copy_context()
        return super().submit(
            context.run,
            functools.partial(function, *args, **kwargs)
        )


def function():
    print(var.get())


async def main():
    await asyncio.get_event_loop().run_in_executor(None, function)


if __name__ == '__main__':
    var = contextvars.ContextVar('variable')
    var.set('Message.')

    asyncio.get_event_loop().set_default_executor(CustomThreadPoolExecutor)
    asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

Den*_*rov 5

您可以使用需要的包装函数copy_context.items(),设置它们并调用您的函数。functools.partial将帮助您创建包装函数以传递给run_in_executor. 这是我的装饰器的工作测试:

def test_run_in_thread_pool_executor():
    def init(func, ctx_vars, *args, **kwargs):
        for var, value in ctx_vars:
            var.set(value)
        return func(*args, **kwargs)

    @async_add_headers('streaming')
    async def wrapper(f):
        loop = asyncio.get_event_loop()
        ctx = contextvars.copy_context()
        executor = futures.ThreadPoolExecutor(max_workers=5)
        return await loop.run_in_executor(executor, functools.partial(init, f, ctx.items()))

    @add_headers('client')
    def foo():
        assert caller_context_var.get() == 'streaming'

    async def main_test():
        await wrapper(foo)

    asyncio.run(main_test())
Run Code Online (Sandbox Code Playgroud)

在这里add_headersasync_add_headers按照调用函数的顺序更改一些上下文变量。caller_context_var.get()将等于'client'没有init功能。

不幸的是,它仅适用于ThreadPoolExecutor,不适用于,ProcessPoolExecutor因为 Context 对象不可picklable。检查相关PEP 567 部分。还有执行器的示例:

executor = ThreadPoolExecutor()
current_context = contextvars.copy_context()

executor.submit(current_context.run, some_function)
Run Code Online (Sandbox Code Playgroud)