Vla*_*ich 2 python python-multithreading python-contextvars
就目前而言,我已经找到了很多关于 contextvars 模块如何与 asyncio 一起运行的示例,但没有一个关于如何与线程一起运行的示例(asyncio.get_event_loop().run_in_executor、threading.Thread 等)。
我的问题是,如何将上下文传递给单独的线程?下面您可以看到一个不起作用的代码片段(python 3.9.8)。
import typing
import asyncio
import contextvars
import concurrent.futures
class CustomThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
def submit(
self,
function: typing.Callable,
*args,
**kwargs
) -> concurrent.futures.Future:
context = contextvars.copy_context()
return super().submit(
context.run,
functools.partial(function, *args, **kwargs)
)
def function():
print(var.get())
async def main():
await asyncio.get_event_loop().run_in_executor(None, function)
if __name__ == '__main__':
var = contextvars.ContextVar('variable')
var.set('Message.')
asyncio.get_event_loop().set_default_executor(CustomThreadPoolExecutor)
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
您可以使用需要的包装函数copy_context.items(),设置它们并调用您的函数。functools.partial将帮助您创建包装函数以传递给run_in_executor. 这是我的装饰器的工作测试:
def test_run_in_thread_pool_executor():
def init(func, ctx_vars, *args, **kwargs):
for var, value in ctx_vars:
var.set(value)
return func(*args, **kwargs)
@async_add_headers('streaming')
async def wrapper(f):
loop = asyncio.get_event_loop()
ctx = contextvars.copy_context()
executor = futures.ThreadPoolExecutor(max_workers=5)
return await loop.run_in_executor(executor, functools.partial(init, f, ctx.items()))
@add_headers('client')
def foo():
assert caller_context_var.get() == 'streaming'
async def main_test():
await wrapper(foo)
asyncio.run(main_test())
Run Code Online (Sandbox Code Playgroud)
在这里add_headers,async_add_headers按照调用函数的顺序更改一些上下文变量。caller_context_var.get()将等于'client'没有init功能。
不幸的是,它仅适用于ThreadPoolExecutor,不适用于,ProcessPoolExecutor因为 Context 对象不可picklable。检查相关PEP 567 部分。还有执行器的示例:
executor = ThreadPoolExecutor()
current_context = contextvars.copy_context()
executor.submit(current_context.run, some_function)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1894 次 |
| 最近记录: |