"ResourceClosedError:交易已关闭",芹菜节拍和sqlalchemy +金字塔应用程序错误

Ran*_*dra 2 sqlalchemy celery pyramid

我有一个金字塔应用程序叫mainsite.

该站点以非常异步的方式工作,主要是通过从视图启动的线程来执行后端操作.

它使用sqlalchemy连接到mysql,并使用ZopeTransactionExtension进行会话管理.

到目前为止,应用程序运行良好.

我需要在其上运行定期作业,它需要使用从视图中启动的一些相同的异步函数.

我使用了apscheduler,但遇到了问题.所以我想把celery beat作为一个单独的过程,将mainapp视为一个库并导入要使用的函数.

我的芹菜配置看起来像这样:

from datetime import timedelta
from api.apiconst import RERUN_CHECK_INTERVAL, AUTOMATION_CHECK_INTERVAL, \
    AUTH_DELETE_TIME

BROKER_URL = 'sqla+mysql://em:em@localhost/edgem'
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = 'mysql://em:em@localhost/edgem'

CELERYBEAT_SCHEDULE = {
    'rerun': {
        'task': 'tasks.rerun_scheduler',
        'schedule': timedelta(seconds=RERUN_CHECK_INTERVAL)
    },
    'automate': {
        'task': 'tasks.automation_scheduler',
        'schedule': timedelta(seconds=20)
    },
    'remove-tokens': {
        'task': 'tasks.token_remover_scheduler',
        'schedule': timedelta(seconds=2 * 24 * 3600 )
    },
}

CELERY_TIMEZONE = 'UTC'
Run Code Online (Sandbox Code Playgroud)

tasks.py是

from celery import Celery
celery = Celery('tasks')
celery.config_from_object('celeryconfig')


@celery.task
def rerun_scheduler():
    from mainsite.task import check_update_rerun_tasks
    check_update_rerun_tasks()


@celery.task
def automation_scheduler():
    from mainsite.task import automate
    automate()


@celery.task
def token_remover_scheduler():
    from mainsite.auth_service import delete_old_tokens
    delete_old_tokens()
Run Code Online (Sandbox Code Playgroud)

请记住,所有上述功能都会立即返回,但如果需要,则启动线程

线程通过执行将对象保存到db中transaction.commit() after session.add(object).

问题是整个事情就像宝石一样只有大约30分钟.之后,ResourceClosedError: The transaction is closed错误开始发生在任何地方transaction.commit().我不确定是什么问题,我需要帮助排除故障.

我在任务中导入的原因是为了摆脱这个错误.考虑每次需要运行任务时导入都是一个好主意,我每次都可以获得一个新的事务,但看起来并非如此.

Ser*_*gey 9

根据我的经验,尝试重用配置为与Pyramid一起使用的会话(使用ZopeTransactionExtension等)与Celery工作程序会导致难以调试的混乱.

ZopeTransactionExtension将SQLAlchemy会话绑定到Pyramid的请求 - 响应周期 - 事务自动启动并提交或回滚,您通常不应在代码中使用transaction.commit() - 如果一切正常,ZTE将提交所有内容,如果您的代码引发和异常您的事务将被回滚.

使用Celery,您需要手动管理SQLAlchemy会话,中兴通讯阻止您这样做,因此您需要以DBSession不同的方式配置.

像这样简单的东西会起作用:

DBSession = None

def set_dbsession(session):
    global DBSession
    if DBSession is not None:
        raise AttributeError("DBSession has been already set to %s!" % DBSession)

    DBSession = session
Run Code Online (Sandbox Code Playgroud)

然后从金字塔启动代码你做

def main(global_config, **settings):
    ...
    set_dbsession(scoped_session(sessionmaker(extension=ZopeTransactionExtension())))
Run Code Online (Sandbox Code Playgroud)

使用Celery它有点棘手 - 我最终为Celery创建了一个自定义启动脚本,我在其中配置了会话.

setup.py该的worker蛋:

  entry_points="""
  # -*- Entry points: -*-
  [console_scripts]
  custom_celery = worker.celeryd:start_celery
  custom_celerybeat = worker.celeryd:start_celerybeat
  """,
  )
Run Code Online (Sandbox Code Playgroud)

worker/celeryd.py:

def initialize_async_session(db_string, db_echo):

    import sqlalchemy as sa
    from db import Base, set_dbsession

    session = sa.orm.scoped_session(sa.orm.sessionmaker(autoflush=True, autocommit=True))
    engine = sa.create_engine(db_string, echo=db_echo)
    session.configure(bind=engine)

    set_dbsession(session)
    Base.metadata.bind = engine


def start_celery():
    initialize_async_session(DB_STRING, DB_ECHO)
    import celery.bin.celeryd
    celery.bin.celeryd.main()
Run Code Online (Sandbox Code Playgroud)

如果您计划将应用程序部署到生产服务器,那么您使用"从视图中启动线程以执行后端操作"的一般方法对我来说会有点危险 - Web服务器经常回收,杀死或创建新的"工人"通常不能保证每个特定的过程能够在当前的请求 - 响应周期之后存活.我从来没有试过这样做,所以也许你会好的:)