celery,flask sqlalchemy:DatabaseError:(DatabaseError)SSL错误:解密失败或错误记录mac

Ank*_*kit 14 sqlalchemy celery python-2.7 flask-sqlalchemy

嗨,我有一个设置,我正在使用Celery Flask SqlAlchemy,我间歇性地收到此错误:

 (psycopg2.DatabaseError) SSL error: decryption failed or bad record mac
Run Code Online (Sandbox Code Playgroud)

我关注这篇文章:

Celery + SQLAlchemy:DatabaseError:(DatabaseError)SSL错误:解密失败或错误记录mac

还有一些,并添加了prerun和postrun方法:

@task_postrun.connect
def close_session(*args, **kwargs):
    # Flask SQLAlchemy will automatically create new sessions for you from 
    # a scoped session factory, given that we are maintaining the same app
    # context, this ensures tasks have a fresh session (e.g. session errors 
    # won't propagate across tasks)
    d.session.remove()

@task_prerun.connect
def on_task_init(*args, **kwargs):
    d.engine.dispose()
Run Code Online (Sandbox Code Playgroud)

但我仍然看到这个错误.有人解决了吗?

请注意,我在AWS上运行它(两个服务器访问相同的数据库).数据库本身托管在自己的服务器(而不是RDS)上.我相信总的芹菜背景任务是6(2 + 4).Flask前端使用gunicorn运行.

我的相关主题:https: //github.com/celery/celery/issues/3238#issuecomment-225975220

Bor*_*rov 1

这是我的评论以及其他信息:

我在AWS上使用Celery、SQLAlchemy和PostgreSQL,没有这样的问题。我能想到的唯一区别是我的数据库位于 RDS 上。我认为您可以尝试临时切换到 RDS,只是为了测试问题是否仍然存在。如果它随着 RDS 消失,那么您需要查看 PostgreSQL 设置。

根据RDS参数,我启用了SSL:

ssl = 1, Enables SSL connections.
ssl_ca_file = /rdsdbdata/rds-metadata/ca-cert.pem
ssl_cert_file = /rdsdbdata/rds-metadata/server-cert.pem
ssl_ciphers = false, Sets the list of allowed SSL ciphers.
ssl_key_file = /rdsdbdata/rds-metadata/server-key.pem
ssl_renegotiation_limit = 0, integer, (kB) Set the amount of traffic to send and receive before renegotiating the encryption keys.
Run Code Online (Sandbox Code Playgroud)

至于Celery初始化代码,大致是这样的

from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

import sqldb

engine = sqldb.get_engine()
cached_data = None

def do_the_work():
    global engine, ruckus_data
    if cached_data is not None:
        return cached_data
    db_session = None
    try:
        db_session = scoped_session(sessionmaker(
            autocommit=False, autoflush=False, bind=engine))
        data = sqldb.get_session().query(
            sqldb.system.MyModel).filter_by(
                my_type = sqldb.system.MyModel.TYPEA).all()
        cached_data = {}
        for row in data:
            ... # put row into cached_data
    finally:
        if db_session is not None:
            db_session.remove()
    return cached_data
Run Code Online (Sandbox Code Playgroud)

do_the_work然后从 celery 任务调用该函数。看起来sqldb.get_engine像这样:

from sqlalchemy import create_engine

_engine = None

def get_engine():
    global _engine
    if _engine:
        return _engine
    _engine = create_engine(config.SQL_DB_URL, echo=config.SQL_DB_ECHO)
    return _engine
Run Code Online (Sandbox Code Playgroud)

最后,配置模块中的 SQL_DB_URI 和 SQL_DB_ECHO 如下:

SQL_DB_URL = 'postgresql+psycopg2://%s:%s@%s/%s' % (
    POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_DB_NAME)
SQL_DB_ECHO = False
Run Code Online (Sandbox Code Playgroud)