asyncpg - 无法执行操作:另一个操作正在进行中

Rod*_*ele 22 asynchronous python-3.x python-asyncio asyncpg fastapi

我正在尝试解决以下错误:

\n
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress\n
Run Code Online (Sandbox Code Playgroud)\n

这是完整的回溯:

\n
Traceback (most recent call last):\n\n  File "<string>", line 1, in <module>\n  File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/spawn.py", line 116, in spawn_main\n    exitcode = _main(fd, parent_sentinel)\n               \xe2\x94\x82     \xe2\x94\x82   \xe2\x94\x94 4\n               \xe2\x94\x82     \xe2\x94\x94 7\n               \xe2\x94\x94 <function _main at 0x109c8aca0>\n  File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/spawn.py", line 129, in _main\n    return self._bootstrap(parent_sentinel)\n           \xe2\x94\x82    \xe2\x94\x82          \xe2\x94\x94 4\n           \xe2\x94\x82    \xe2\x94\x94 <function BaseProcess._bootstrap at 0x109b1f8b0>\n           \xe2\x94\x94 <SpawnProcess name='SpawnProcess-4' parent=36604 started>\n  File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap\n    self.run()\n    \xe2\x94\x82    \xe2\x94\x94 <function BaseProcess.run at 0x109b18ee0>\n    \xe2\x94\x94 <SpawnProcess name='SpawnProcess-4' parent=36604 started>\n  File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/process.py", line 108, in run\n    self._target(*self._args, **self._kwargs)\n    \xe2\x94\x82    \xe2\x94\x82        \xe2\x94\x82    \xe2\x94\x82        \xe2\x94\x82    \xe2\x94\x94 {'config': <uvicorn.config.Config object at 0x109cd55b0>, 'target': <bound method Server.run of <uvicorn.server.Server object...\n    \xe2\x94\x82    \xe2\x94\x82        \xe2\x94\x82    \xe2\x94\x82        \xe2\x94\x94 <SpawnProcess name='SpawnProcess-4' parent=36604 started>\n    \xe2\x94\x82    \xe2\x94\x82        \xe2\x94\x82    \xe2\x94\x94 ()\n    \xe2\x94\x82    \xe2\x94\x82        \xe2\x94\x94 <SpawnProcess name='SpawnProcess-4' parent=36604 started>\n    \xe2\x94\x82    \xe2\x94\x94 <function subprocess_started at 0x10a4aca60>\n    \xe2\x94\x94 <SpawnProcess name='SpawnProcess-4' parent=36604 started>\n  File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/uvicorn/subprocess.py", line 61, in subprocess_started\n    target(sockets=sockets)\n    \xe2\x94\x82              \xe2\x94\x94 [<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 80)>]\n    \xe2\x94\x94 <bound method Server.run of <uvicorn.server.Server object at 0x109cd56a0>>\n  File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/uvicorn/server.py", line 48, in run\n    loop.run_until_complete(self.serve(sockets=sockets))\n    \xe2\x94\x82    \xe2\x94\x82                  \xe2\x94\x82    \xe2\x94\x82             \xe2\x94\x94 [<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 80)>]\n    \xe2\x94\x82    \xe2\x94\x82                  \xe2\x94\x82    \xe2\x94\x94 <function Server.serve at 0x10a4abca0>\n    \xe2\x94\x82    \xe2\x94\x82                  \xe2\x94\x94 <uvicorn.server.Server object at 0x109cd56a0>\n    \xe2\x94\x82    \xe2\x94\x94 <function BaseEventLoop.run_until_complete at 0x10a205820>\n    \xe2\x94\x94 <_UnixSelectorEventLoop running=True closed=False debug=False>\n  File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete\n    self.run_forever()\n    \xe2\x94\x82    \xe2\x94\x94 <function BaseEventLoop.run_forever at 0x10a205790>\n    \xe2\x94\x94 <_UnixSelectorEventLoop running=True closed=False debug=False>\n  File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 570, in run_forever\n    self._run_once()\n    \xe2\x94\x82    \xe2\x94\x94 <function BaseEventLoop._run_once at 0x10a209310>\n    \xe2\x94\x94 <_UnixSelectorEventLoop running=True closed=False debug=False>\n  File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 1859, in _run_once\n    handle._run()\n    \xe2\x94\x82      \xe2\x94\x94 <function Handle._run at 0x10a13ed30>\n    \xe2\x94\x94 <Handle <TaskWakeupMethWrapper object at 0x10bb75a60>(<_GatheringFu...in progress')>)>\n  File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/events.py", line 81, in _run\n    self._context.run(self._callback, *self._args)\n    \xe2\x94\x82    \xe2\x94\x82            \xe2\x94\x82    \xe2\x94\x82           \xe2\x94\x82    \xe2\x94\x94 <member '_args' of 'Handle' objects>\n    \xe2\x94\x82    \xe2\x94\x82            \xe2\x94\x82    \xe2\x94\x82           \xe2\x94\x94 <Handle <TaskWakeupMethWrapper object at 0x10bb75a60>(<_GatheringFu...in progress')>)>\n    \xe2\x94\x82    \xe2\x94\x82            \xe2\x94\x82    \xe2\x94\x94 <member '_callback' of 'Handle' objects>\n    \xe2\x94\x82    \xe2\x94\x82            \xe2\x94\x94 <Handle <TaskWakeupMethWrapper object at 0x10bb75a60>(<_GatheringFu...in progress')>)>\n    \xe2\x94\x82    \xe2\x94\x94 <member '_context' of 'Handle' objects>\n    \xe2\x94\x94 <Handle <TaskWakeupMethWrapper object at 0x10bb75a60>(<_GatheringFu...in progress')>)>\n\n> File "./xxx/xxx/xxx.py", line 144, in get_disclosure_data\n    hh_json, db_json = await asyncio.gather(*coroutines)\n                             \xe2\x94\x82       \xe2\x94\x82       \xe2\x94\x94 [<coroutine object xxxx at 0x10bb2cb40>, <coroutine object db_call at 0x10bb2cc40>]\n                             \xe2\x94\x82       \xe2\x94\x94 <function gather at 0x10a1fad30>\n                             \xe2\x94\x94 <module 'asyncio' from '/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/__init__.py'>\n\n  File "./xxx/xxx/xxx.py", line 52, in db_call\n    db_json = await asyncio.gather(*coroutines, loop=asyncio.get_event_loop())\n                    \xe2\x94\x82       \xe2\x94\x82       \xe2\x94\x82                \xe2\x94\x82       \xe2\x94\x94 <built-in function get_event_loop>\n                    \xe2\x94\x82       \xe2\x94\x82       \xe2\x94\x82                \xe2\x94\x94 <module 'asyncio' from '/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/__init__.py'>\n                    \xe2\x94\x82       \xe2\x94\x82       \xe2\x94\x94 [<coroutine object DBConnectionManager.fetch_item at 0x10bb434c0>, <coroutine object DBConnectionManager.fetch_item at 0x10bb...\n                    \xe2\x94\x82       \xe2\x94\x94 <function gather at 0x10a1fad30>\n                    \xe2\x94\x94 <module 'asyncio' from '/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/__init__.py'>\n\n  File "./xxx/xxx/xx.py", line 97, in fetch_item\n    await self._connection_pool.release(self.con)\n          \xe2\x94\x82    \xe2\x94\x82                \xe2\x94\x82       \xe2\x94\x82    \xe2\x94\x94 <PoolConnectionProxy [released] 0x10bbc9cd0>\n          \xe2\x94\x82    \xe2\x94\x82                \xe2\x94\x82       \xe2\x94\x94 <chd_api.data.db.DBConnectionManager object at 0x10b946a30>\n          \xe2\x94\x82    \xe2\x94\x82                \xe2\x94\x94 <function Pool.release at 0x10b956a60>\n          \xe2\x94\x82    \xe2\x94\x94 <asyncpg.pool.Pool object at 0x10bb131e0>\n          \xe2\x94\x94 <chd_api.data.db.DBConnectionManager object at 0x10b946a30>\n\n  File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/pool.py", line 666, in release\n    return await asyncio.shield(ch.release(timeout))\n                 \xe2\x94\x82       \xe2\x94\x82      \xe2\x94\x82  \xe2\x94\x82       \xe2\x94\x94 None\n                 \xe2\x94\x82       \xe2\x94\x82      \xe2\x94\x82  \xe2\x94\x94 <function PoolConnectionHolder.release at 0x10b952e50>\n                 \xe2\x94\x82       \xe2\x94\x82      \xe2\x94\x94 <asyncpg.pool.PoolConnectionHolder object at 0x10bb2a5c0>\n                 \xe2\x94\x82       \xe2\x94\x94 <function shield at 0x10a1faee0>\n                 \xe2\x94\x94 <module 'asyncio' from '/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/__init__.py'>\n  File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/pool.py", line 218, in release\n    raise ex\n  File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/pool.py", line 208, in release\n    await self._con.reset(timeout=budget)\n          \xe2\x94\x82    \xe2\x94\x82                  \xe2\x94\x94 None\n          \xe2\x94\x82    \xe2\x94\x94 <member '_con' of 'PoolConnectionHolder' objects>\n          \xe2\x94\x94 <asyncpg.pool.PoolConnectionHolder object at 0x10bb2a5c0>\n  File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/connection.py", line 1311, in reset\n    await self.execute(reset_query, timeout=timeout)\n          \xe2\x94\x82    \xe2\x94\x82       \xe2\x94\x82                    \xe2\x94\x94 None\n          \xe2\x94\x82    \xe2\x94\x82       \xe2\x94\x94 'SELECT pg_advisory_unlock_all();\\nCLOSE ALL;\\nUNLISTEN *;\\nRESET ALL;'\n          \xe2\x94\x82    \xe2\x94\x94 <function Connection.execute at 0x10b93f3a0>\n          \xe2\x94\x94 <asyncpg.connection.Connection object at 0x10bc34120>\n  File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/connection.py", line 297, in execute\n    return await self._protocol.query(query, timeout)\n                 \xe2\x94\x82    \xe2\x94\x82               \xe2\x94\x82      \xe2\x94\x94 None\n                 \xe2\x94\x82    \xe2\x94\x82               \xe2\x94\x94 'SELECT pg_advisory_unlock_all();\\nCLOSE ALL;\\nUNLISTEN *;\\nRESET ALL;'\n                 \xe2\x94\x82    \xe2\x94\x94 <member '_protocol' of 'Connection' objects>\n                 \xe2\x94\x94 <asyncpg.connection.Connection object at 0x10bc34120>\n  File "asyncpg/protocol/protocol.pyx", line 321, in query\n    self._check_state()\n  File "asyncpg/protocol/protocol.pyx", line 684, in asyncpg.protocol.protocol.BaseProtocol._check_state\n    raise apg_exc.InterfaceError(\n          \xe2\x94\x82       \xe2\x94\x94 <class 'asyncpg.exceptions._base.InterfaceError'>\n          \xe2\x94\x94 <module 'asyncpg.exceptions' from '/Users/ddd/Desktop/repos/chd-api/.venv/lib/python3.8/site-packages/asyncpg/exception...\n\nasyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress\n
Run Code Online (Sandbox Code Playgroud)\n

我有以下代码来设置连接池并使用池中的连接执行查询:

\n
class DBConnectionManager(object):\n    """ Class for setting up and tearing down db connection """\n\n    def __init__(self):\n        self.host = SETTINGS.db_host\n        self.database = SETTINGS.db_name\n        self.user = SETTINGS.db_user\n        self.password = SETTINGS.db_password\n        self.port = "5432"\n\n        self._connection_pool = None\n        self.con = None\n\n    async def connect(self):\n        if not self._connection_pool:\n            try:\n                self._connection_pool = await asyncpg.create_pool(\n                    host=self.host,\n                    database=self.database,\n                    user=self.user,\n                    password=self.password,\n                    port=self.port,\n                    min_size=50,\n                    max_size=100,\n                )\n                logger.info("Database pool connection opened")\n\n            except Exception as e:\n                logger.exception(e)\n\n    async def fetch_item(self, query: str, *args):\n        if not self._connection_pool:\n            await self.connect()\n        else:\n            self.con = await self._connection_pool.acquire()\n            try:\n                result = await self.con.fetch(query, *args)\n                return result\n            except Exception as e:\n                logger.exception(e)\n            finally:\n                await self._connection_pool.release(self.con)\n\n    async def close(self):\n        if not self._connection_pool:\n            try:\n                await self._connection_pool.close()\n                logger.info("Database pool connection closed")\n            except Exception as e:\n                logger.exception(e)\n
Run Code Online (Sandbox Code Playgroud)\n

并尝试使用以下命令执行大约 22 个数据库调用:

\n
async def db_call(db, lat, lng):\n    """\n    Performs the necessary db calls given a lat, lng\n\n    Required Input:\n        lat::float a latitude in decimal degrees. Must be specified with `lng` (i.e. 39.2994)\n        lng::float a longitude in decimal degrees. Must be specified with `lat` (i.e. -122.33)\n\n    Returns:\n        dict\n    """\n    coroutines = []\n    for table in db_map:\n\n        # SQL columns\n        db_fields = ",".join(\n            [\n                f"{col} AS {db_map[table]['fields'][col]}"\n                for col in db_map[table]["fields"]\n            ]\n        )\n\n        # Output names\n        api_fields = [db_map[table]["fields"][col] for col in db_map[table]["fields"]]\n\n        if db_map[table]["query_type"] == "pip":\n            limit = db_map[table]["options"]["LIMIT"]\n            query = f"SELECT {db_fields} from {table} WHERE (ST_Covers(geom, GeomFromEWKT('SRID=4326;POINT({lng} {lat})'))) LIMIT {limit};"\n\n        else:\n            distance = db_map[table]["options"]["DISTANCE"]\n            geo2geo = f"geom::geography, GeomFromEWKT('SRID=4326;POINT({lng} {lat})')::geography"\n            query = (\n                f"SELECT {db_fields}, ST_Distance({geo2geo})"\n                f"from {table} WHERE (ST_DWithin({geo2geo}, {distance}))"\n                f"ORDER BY ST_Distance({geo2geo}) LIMIT 1;"\n            )\n\n        coroutines.append(db.fetch_item(query))\n\n    db_res = await asyncio.gather(*coroutines)\n    \n    .... code for processing results\n\n
Run Code Online (Sandbox Code Playgroud)\n

我已经检查了 asyncpg github 上有关此错误的几个问题,但仍然没有找到合适的解决方案。另请注意,此调用是在 FastAPI 中执行的。任何关于为什么会发生此错误/解决该错误的步骤的指导将不胜感激。

\n

use*_*342 21

self.con对in的赋值fetch_item会导致多个协程共享同一个连接。虽然您确实希望它们共享连接池,但共享相同的连接是没有意义的,因为连接是有状态的。

要解决此问题,请将 的用法替换self.con为局部变量con