如何使用 asyncio 和 postgres 在 python 中进行事务处理?

Art*_*nov 5 python postgresql transactions async-await python-asyncio

我的RPC方法中有两个操作:

async def my_rpc(self, data):
    async with self.Engine() as conn:
        await conn.execute("SELECT ... FROM MyTable");
        ...  # It seems the table MyTable can be changed by another RPC
        await conn.execute("UPDATA MyTable ...");
Run Code Online (Sandbox Code Playgroud)

另一种 RPC 方法可以在操作“my_rpc”完成之前更改数据库(在 SQL 查询的两次等待之间)。如何避免这种情况呢?

self.Engine 的代码(使用 engine 调用aiopg.sa.create_engine):

class ConnectionContextManager(object):
    def __init__(self, engine):
        self.conn = None
        self.engine = engine

    async def __aenter__(self):
        if self.engine:
            self.conn = await self.engine.acquire()
            return self.conn

    async def __aexit__(self, exc_type, exc, tb):
        try:
            self.engine.release(self.conn)
            self.conn.close()
        finally:
            self.conn = None
            self.engine = None
Run Code Online (Sandbox Code Playgroud)

Pet*_*ter 5

首先,aiopg在自动提交模式下工作,这意味着您必须在手动模式下使用事务。阅读更多详情

其次,您必须使用 SELECT FOR UPDATE 来锁定在第一个语句中读取的行。SELECT FOR UPDATE 锁定选择行,直到事务完成。阅读更多详情

async def my_rpc(self, data):
    async with self.Engine() as conn:
        await conn.execute("BEGIN")
        await conn.execute("SELECT ... FROM MyTable WHERE some_clause = some_value FOR UPDATE")
        ...  # It seems the table MyTable can be changed by another RPC
        await conn.execute("UPDATE MyTable SET some_clause=...")
        await conn.execute("""COMMIT""")
Run Code Online (Sandbox Code Playgroud)