重试失败的 sqlalchemy 查询

Mug*_*gen 6 python mysql sqlalchemy flask-sqlalchemy retry-logic

每次我重新启动mysql服务时,我的应用程序都会在任何查询中收到以下错误:

result = self._query(query)
  File "/usr/local/lib/python3.6/site-packages/pymysql/cursors.py", line 328, in _query
    conn.query(q)
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 516, in query
    self._affected_rows = self._read_query_result(unbuffered=unbuffered)
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 727, in _read_query_result
    result.read()
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 1066, in read
    first_packet = self.connection._read_packet()
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 656, in _read_packet
    packet_header = self._read_bytes(4)
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 702, in _read_bytes
    CR.CR_SERVER_LOST, "Lost connection to MySQL server during query")
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (2013, 'Lost connection to MySQL server during query') [SQL: ...] [parameters: {...}] (Background on this error at: http://sqlalche.me/e/e3q8)
Run Code Online (Sandbox Code Playgroud)

之后的任何查询都会像往常一样成功。

例如,这只是一个常见用例,通常我可能想根据错误重试任何查询。

有没有办法在一些低级sqlalchemyapi中捕获并重试查询?query在我的代码中执行 try-except 或自定义方法是不合理的,因为我使用它太多次并且无法维护。

小智 10

非常感谢这个片段,我必须对其进行一些调整才能直接与 sqlalchemy.orm 一起使用:如果它对任何人都有用......

from sqlalchemy.exc import OperationalError, StatementError
from sqlalchemy.orm.query import Query as _Query
from time import sleep

class RetryingQuery(_Query):
    __max_retry_count__ = 3

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def __iter__(self):
        attempts = 0
        while True:
            attempts += 1
            try:
                return super().__iter__()
            except OperationalError as ex:
                if "server closed the connection unexpectedly" not in str(ex):
                    raise
                if attempts <= self.__max_retry_count__:
                    sleep_for = 2 ** (attempts - 1)
                    logging.error(
                        "/!\ Database connection error: retrying Strategy => sleeping for {}s"
                    " and will retry (attempt #{} of {}) \n Detailed query impacted: {}".format(
                        sleep_for, attempts, self.__max_retry_count__, ex)
                )
                    sleep(sleep_for)
                    continue
                else:
                    raise
            except StatementError as ex:
                if "reconnect until invalid transaction is rolled back" not in str(ex):
                    raise
                self.session.rollback()
Run Code Online (Sandbox Code Playgroud)

对于使用:将选项传递给 sessionmaker:

sqlalchemy.orm.sessionmaker(bind=engine, query_cls=RetryingQuery)
Run Code Online (Sandbox Code Playgroud)


Mug*_*gen 6

显然 sqlalchemy有一个很好的选择来自定义查询类,这正是我所需要的。

类实现:

import logging
from flask_sqlalchemy import BaseQuery
from sqlalchemy.exc import OperationalError
from time import sleep

class RetryingQuery(BaseQuery):

    __retry_count__ = 3
    __retry_sleep_interval_sec__ = 0.5

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def __iter__(self):
        attempts = 0
        while True:
            attempts += 1
            try:
                return super().__iter__()
            except OperationalError as ex:
                if "Lost connection to MySQL server during query" not in str(ex):
                    raise
                if attempts < self.__retry_count__:
                    logging.debug(
                        "MySQL connection lost - sleeping for %.2f sec and will retry (attempt #%d)",
                        self.__retry_sleep_interval_sec__, attempts
                    )
                    sleep(self.__retry_sleep_interval_sec__)
                    continue
                else:
                    raise
Run Code Online (Sandbox Code Playgroud)

用法:

class BaseModel(Model):
    ...
    query_class = RetryingQuery
    ...

db = SQLAlchemy(model_class=BaseModel, query_class=RetryingQuery)
Run Code Online (Sandbox Code Playgroud)

  • 你好 - SQLAlchemy 作者在这里。我刚刚被指给了这个食谱。我强烈建议不要使用上述模式。连接的“重试”应该仅在事务的顶层执行,这就是池 pre_ping 功能的用途。如果在事务过程中失去连接,则需要重新运行整个操作。显式优于隐式。 (9认同)
  • @zzzeek 感谢您指出这一点。事实上,明确的更好,我今天也会避免这种方法。尽管我确实遇到了根本不可能采用明确方法的环境。 (2认同)