确保只有一个工作人员在运行多个工作人员的金字塔Web应用程序中启动apscheduler事件

Ran*_*dra 24 django wsgi pyramid gunicorn apscheduler

我们有一个用金字塔制作的网络应用程序,并通过gunicorn + nginx提供服务.它适用于8个工作线程/进程

我们需要工作,我们选择了apscheduler.这是我们如何推出它

from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from apscheduler.scheduler import Scheduler

rerun_monitor = Scheduler()
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)
Run Code Online (Sandbox Code Playgroud)

问题是gunicorn的所有工作进程都选择了调度程序.我们尝试实现文件锁,但它似乎不是一个足够好的解决方案.什么是最好的方法来确保在任何给定的时间只有一个工作进程选择预定的事件,没有其他线程选择到下一个JOB_INTERVAL

如果我们决定稍后切换到apache2 + modwsgi,解决方案甚至需要使用mod_wsgi.它需要与作为服务员的单进程开发服务器一起工作.

来自赏金赞助商的更新

我正面临OP所描述的相同问题,只需使用Django应用程序.我最有把握的是,如果原始问题,这个细节不会有太大变化.出于这个原因,为了获得更多的可见性,我还标记了这个问题django.

The*_*inn 21

由于Gunicorn开始用8名工人(在你的例子),该应用8次进8点的过程.这8个进程是从进程分叉的,进程监视每个进程的状态并能够添加/删除工作程序.

每个进程都会获取APScheduler对象的副本,该对象最初是主进程的APScheduler的精确副本.这导致每个"第n"工人(进程)总共执行每个作业"n"次.

解决这个问题的方法是使用以下选项运行gunicorn:

env/bin/gunicorn module_containing_app:app -b 0.0.0.0:8080 --workers 3 --preload
Run Code Online (Sandbox Code Playgroud)

--preload标志告诉Gunicorn" 在分配工作进程之前加载应用程序 ".通过这样做,每个工作人员"被给予应用程序的副本,已经由主人实例化,而不是实例化应用程序本身".这意味着以下代码仅在主进程中执行一次:

rerun_monitor = Scheduler()
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)
Run Code Online (Sandbox Code Playgroud)

另外,我们需要将jobstore设置为以下内容:memory :.这样,虽然每个worker都是自己独立的进程,无法与其他7进行通信,但是通过使用本地数据库(而不是内存),我们保证单个 - 在jobstore上进行CRUD操作的真实点.

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

rerun_monitor = Scheduler(
    jobstores={'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)
Run Code Online (Sandbox Code Playgroud)

最后,我们想要使用BackgroundScheduler,因为它的实现start().当我们调用start()BackgroundScheduler时,会在后台启动一个新线程,该线程负责调度/执行作业.这很重要,因为记住在步骤(1)中,由于我们的--preload标志,我们只start()在Master Gunicorn过程中执行一次该函数.根据定义,分叉进程不会继承其父级的线程,因此每个工作程序都不会运行BackgroundScheduler线程.

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

rerun_monitor = BackgroundScheduler(
    jobstores={'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)
Run Code Online (Sandbox Code Playgroud)

所有这一切的结果是,每个Gunicorn工作人员都有一个被欺骗进入"STARTED"状态的APScheduler,但实际上并没有运行,因为它丢弃了它父母的线程!每个实例也能够更新jobstore数据库,只是不执行任何作业!

查看flask-APScheduler,以便在Web服务器(如Gunicorn)中快速运​​行APScheduler,并为每个作业启用CRUD操作.

  • 如果你有一个 gunicorn python 配置文件,你可以创建一个名为 `on_starting` 的函数,它只会由 gunicorn 主进程而不是工作进程执行。您还可以在那里创建和启动调度程序。查看 https://github.com/mlsecproject/gglsbl-rest/blob/master/config.py 以获取示例。 (5认同)

Pao*_*olo 13

我找到了一个与Django项目有关的修复程序.我只是在调度程序第一次启动时绑定TCP套接字,然后再对其进行检查.我认为以下代码也适用于您,并进行了一些小调整.

import sys, socket

try:
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("127.0.0.1", 47200))
except socket.error:
    print "!!!scheduler already started, DO NOTHING"
else:
    from apscheduler.schedulers.background import BackgroundScheduler
    scheduler = BackgroundScheduler()
    scheduler.start()
    print "scheduler started"
Run Code Online (Sandbox Code Playgroud)


Xua*_*uan 5

简短的回答:你不可能在没有后果的情况下正确地做这件事。

我使用 Gunicorn 作为示例,但 uWSGI 本质上是相同的。运行多个进程时存在各种黑客行为,仅举几例:

  1. 使用--preload选项
  2. 使用on_startinghook启动APScheduler后台调度程序
  3. 使用when_readyhook启动APScheduler后台调度程序

它们在一定程度上可以工作,但可能会出现以下错误:

  1. 工人经常超时
  2. 没有作业时调度程序挂起https://github.com/agronholm/apscheduler/issues/305

APScheduler 设计为在单个进程中运行,它可以完全控制将作业添加到作业存储的过程。它使用threading.Eventwait()方法set()来协调。如果它们由不同的进程运行,则协调将无法进行。

可以在 Gunicorn 中以单个进程运行它。

  1. 仅使用一个工作进程
  2. 使用post_worker_init钩子来启动调度程序,这将确保调度程序仅在工作进程中运行,而不在主进程中运行

作者还指出,多个进程共享作业存储量是不可能的。https://apscheduler.readthedocs.io/en/stable/faq.html#how-do-i-share-a-single-job-store-among-one-or-more-worker-processes他还提供了一个解决方案使用 RPyC。

虽然用 REST 接口包装 APScheduler 是完全可行的。您可能需要考虑将其作为一个独立的应用程序提供给一名工作人员。换句话说,如果您有其他端点,请将它们放在另一个可以使用多个工作人员的应用程序中。