自动重新连接 postgresq 数据库

iah*_*med 6 python postgresql

我有以下代码:

import psycopg2
conn = psycopg2.connect(database="****", user="postgres", password="*****", host="localhost", port="5432")
print ("Opened database successfully")
cur = conn.cursor()
cur.execute('''select * from xyz''')
print ("Table created successfully")
conn.commit()
conn.close()
Run Code Online (Sandbox Code Playgroud)

像这样我有一些 50 -60 个复杂的查询,但问题是有时 postgre sql 数据库会抛出以下错误

import psycopg2
conn = psycopg2.connect(database="****", user="postgres", password="*****", host="localhost", port="5432")
print ("Opened database successfully")
cur = conn.cursor()
cur.execute('''select * from xyz''')
print ("Table created successfully")
conn.commit()
conn.close()
Run Code Online (Sandbox Code Playgroud)

看起来连接丢失了,我如何自动连接 Postgre 数据库 感谢您的帮助

Clo*_*eto 10

捕获异常并重新连接:

while True:
    conn = psycopg2.connect(database="****", user="postgres", password="*****", host="localhost", port="5432")
    cur = conn.cursor()
    try:
        cur.execute('''select * from xyz''')
    except psycopg2.OperationalError:
        continue
    break;
Run Code Online (Sandbox Code Playgroud)

  • 它不会返回到 try 块。相反,它会返回到“while”循环的开头。 (2认同)

vla*_*mir 7

我会依赖装饰器 -重试并重新连接:

import psycopg2
from tenacity import retry, wait_exponential, stop_after_attempt
from typing import Callable

def reconnect(f: Callable):
    def wrapper(storage: DbStorage, *args, **kwargs):
        if not storage.connected():
            storage.connect()

        try:
            return f(storage, *args, **kwargs)
        except psycopg2.Error:
            storage.close()
            raise

    return wrapper


class DbStorage:

    def __init__(self, conn: string):
        self._conn: string = conn
        self._connection = None

    def connected(self) -> bool:
        return self._connection and self._connection.closed == 0

    def connect(self):
        self.close()
        self._connection = psycopg2.connect(self._conn)

    def close(self):
        if self.connected():
            # noinspection PyBroadException
            try:
                self._connection.close()
            except Exception:
                pass

        self._connection = None

    @retry(stop=stop_after_attempt(3), wait=wait_exponential())
    @reconnect
    def get_data(self): # pass here required params to get data from DB
        # ..
        cur = self._connection.cursor()
        cur.execute('''select * from xyz''')
        # ..
Run Code Online (Sandbox Code Playgroud)