如何在Python中的多个异步进程之间进行同步?

dor*_*mon 5 python synchronization file-locking multiprocessing python-asyncio

我有一个使用 fastapi 的异步 http web 服务。我在服务器上的不同端口上运行同一服务的多个实例,并且前面有一个 nginx 服务器,因此我可以利用它们。我有一个特定的资源,需要保护只有一个客户端可以访问它。

@app.get("/do_something")
async def do_something():
     critical_section_here()
Run Code Online (Sandbox Code Playgroud)

我尝试使用如下文件锁来保护这个关键部分:

@app.get("/do_something")
async def do_something():
    with FileLock("dosomething.lock"):
        critical_section()
Run Code Online (Sandbox Code Playgroud)

这样可以防止多个进程同时进入临界区。但我发现这实际上会死锁。考虑以下事件:

  1. 客户端1连接8000端口并进入临界区
  2. 当客户端 1 仍在使用资源时,客户端 2 被路由到相同的端口 8000,然后它将尝试获取文件锁,但它不能,因此它将继续尝试,这将阻止客户端 1 的执行,而客户端 1 永远不会能够释放文件锁,这意味着不仅此进程被锁定,所有其他服务器实例也将被锁定。

有没有一种方法可以让我协调这些进程,以便只有其中一个进程访问关键部分?我想过为文件锁添加超时,但我真的不想拒绝用户,我只想等到轮到他/她进入临界区。

HTF*_*HTF 5

你可以尝试这样的事情:

import fcntl

from contextlib import asynccontextmanager

from fastapi import FastAPI

app = FastAPI()


def acquire_lock():
    f = open("/tmp/test.lock", "w")
    fcntl.flock(f, fcntl.LOCK_EX)
    return f


@asynccontextmanager
async def lock():
    loop = asyncio.get_running_loop()
    f = await loop.run_in_executor(None, acquire_lock)
    try:
        yield
    finally:
        f.close()


@app.get("/test/")
async def test():
    async with lock():
        print("Enter critical section")
        await asyncio.sleep(5)
        print("End critical section")
Run Code Online (Sandbox Code Playgroud)

它基本上会序列化您的所有请求。