标签: apscheduler

Flask中的apscheduler执行两次

我在烧瓶应用程序中使用apscheduler时遇到问题.

在我的view.py文件中,我写的是这样的

import time
from apscheduler.scheduler import Scheduler

def test_scheduler():
     print "TEST"
     print time.time()


sched = Scheduler()
sched.add_interval_job(test_scheduler, seconds=5)
sched.start()
Run Code Online (Sandbox Code Playgroud)

然后这个方法test_scheduler()每五秒执行两次

测试1360844314.01测试1360844314.2

python flask apscheduler

34
推荐指数
4
解决办法
2万
查看次数

如何在Python Flask框架中运行重复任务?

我正在建立一个向访问者提供一些信息的网站.此信息通过每5秒轮询一对外部API在后台聚合.我现在的工作方式是我使用APScheduler工作.我最初更喜欢APScheduler,因为它使整个系统更容易移植(因为我不需要在新机器上设置cron作业).我启动轮询功能如下:

from apscheduler.scheduler import Scheduler

@app.before_first_request
def initialize():
    apsched = Scheduler()
    apsched.start()

    apsched.add_interval_job(checkFirstAPI, seconds=5)
    apsched.add_interval_job(checkSecondAPI, seconds=5)
    apsched.add_interval_job(checkThirdAPI, seconds=5)
Run Code Online (Sandbox Code Playgroud)

这种方法有用,但它有一些问题:

  1. 对于初学者来说,这意味着interval-jobs在Flask上下文之外运行.到目前为止,这并没有太大问题,但是当调用端点失败时,我希望系统向我发送一封电子邮件(说"嘿,调用API X失败").但是因为它不在Flask上下文中运行,所以它抱怨无法执行flask-mail(RuntimeError('working outside of application context')).
  2. 其次,我想知道当我不再使用Flask内置调试服务器时这将会如何表现,但生产服务器可以说4个工作人员.那会是每次工作四次吗?

总而言之,我觉得应该有更好的方法来运行这些重复的任务,但我不确定如何.有没有人有这个问题的有趣解决方案?欢迎所有提示!

[编辑]我刚刚阅读了Celery及其时间表.虽然我没有真正看到Celery与APScheduler的区别,以及它是否可以解决我的两点,但我想知道是否有人在阅读这篇文章时认为我应该在Celery中进行更多调查?

[结论]大约两年后,我正在读这篇文章,我想我可以让你们知道我最终得到了什么.我认为@BluePeppers说我不应该与Flask生态系统紧密联系.所以我选择使用Ansible设置的每分钟运行的常规cron-jobs.虽然这使它变得有点复杂(我需要学习Ansible并转换一些代码以便每分钟运行它就足够了)我认为这更加强大.我目前正在使用令人敬畏的pythonr-rq来排队同步作业(检查API和发送电子邮件).我刚刚发现了rq-scheduler.我还没有测试过,但它似乎正是我所需要的.所以这可能是未来读者对这个问题的一个提示.

其余的,我希望你们大家度过美好的一天!

python cron flask apscheduler

26
推荐指数
1
解决办法
2万
查看次数

确保只有一个工作人员在运行多个工作人员的金字塔Web应用程序中启动apscheduler事件

我们有一个用金字塔制作的网络应用程序,并通过gunicorn + nginx提供服务.它适用于8个工作线程/进程

我们需要工作,我们选择了apscheduler.这是我们如何推出它

from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from apscheduler.scheduler import Scheduler

rerun_monitor = Scheduler()
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)
Run Code Online (Sandbox Code Playgroud)

问题是gunicorn的所有工作进程都选择了调度程序.我们尝试实现文件锁,但它似乎不是一个足够好的解决方案.什么是最好的方法来确保在任何给定的时间只有一个工作进程选择预定的事件,没有其他线程选择到下一个JOB_INTERVAL

如果我们决定稍后切换到apache2 + modwsgi,解决方案甚至需要使用mod_wsgi.它需要与作为服务员的单进程开发服务器一起工作.

来自赏金赞助商的更新

我正面临OP所描述的相同问题,只需使用Django应用程序.我最有把握的是,如果原始问题,这个细节不会有太大变化.出于这个原因,为了获得更多的可见性,我还标记了这个问题django.

django wsgi pyramid gunicorn apscheduler

24
推荐指数
3
解决办法
8242
查看次数

RuntimeError:async + apscheduler中的线程中没有当前事件循环

我有一个异步功能,需要每隔N分钟使用apscheduller运行一次.下面有一个python代码

URL_LIST = ['<url1>',
            '<url2>',
            '<url2>',
            ]

def demo_async(urls):
    """Fetch list of web pages asynchronously."""
    loop = asyncio.get_event_loop() # event loop
    future = asyncio.ensure_future(fetch_all(urls)) # tasks to do
    loop.run_until_complete(future) # loop until done

async def fetch_all(urls):
    tasks = [] # dictionary of start times for each url
    async with ClientSession() as session:
        for url in urls:
            task = asyncio.ensure_future(fetch(url, session))
            tasks.append(task) # create list of tasks
        _ = await asyncio.gather(*tasks) # gather task responses

async def fetch(url, session):
    """Fetch a …
Run Code Online (Sandbox Code Playgroud)

python python-3.x apscheduler python-asyncio aiohttp

24
推荐指数
5
解决办法
3万
查看次数

python apscheduler - 跳过:达到的最大运行实例数

我正在使用Python apscheduler(版本3.0.1)每秒执行一个函数

代码:

scheduler = BackgroundScheduler()
scheduler.add_job(runsync, 'interval', seconds=1)
scheduler.start()
Run Code Online (Sandbox Code Playgroud)

它大部分时间都很好,但有时我会收到这个警告:

WARNING:apscheduler.scheduler:Execution of job "runsync (trigger: interval[0:00:01], next run at: 2015-12-01 11:50:42 UTC)" skipped: maximum number of running instances reached (1)
Run Code Online (Sandbox Code Playgroud)

1.这是执行此方法的正确方法吗?

这个警告意味着什么?它是否会影响函数内部的任务执行?

3.如何处理这个?

python cron scheduler apscheduler

13
推荐指数
3
解决办法
1万
查看次数

在Flask-APScheduler作业中查询模型会引发应用程序上下文RuntimeError

我想用Flask-APScheduler运行一个查询Flask-SQLAlchemy模型的工作.当工作开始时,我明白了RuntimeError: application not registered on db instance and no application bound to current context.如何运行查询数据库的作业.

from flask_apscheduler import APScheduler

scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
Run Code Online (Sandbox Code Playgroud)
from models import User

def my_job():
    user = User.query.first()
    print(user)
Run Code Online (Sandbox Code Playgroud)

查询期间发生错误,然后才能打印.数据库正在其他应用程序中工作以进行其他查询.

我试图with app.app_context():在设置扩展时添加,但这不起作用.

with app.app_context()
    scheduler = APScheduler()
    scheduler.init_app(app)
    scheduler.start()
Run Code Online (Sandbox Code Playgroud)

完整的追溯是:

ERROR:apscheduler.executors.default:Job "run_in (trigger: interval[0:00:10], next run at: 2016-10-18 23:00:53 CEST)" raised an exception
Traceback (most recent call last):
  File "/Users/user/.virtualenvs/myfolder/lib/python2.7/site-packages/apscheduler/executors/base.py", line 125, in run_job
    retval = job.func(*job.args, **job.kwargs)
  File "/Users/user/Documents/myfolder/myfolder/myfile.py", line 19, …
Run Code Online (Sandbox Code Playgroud)

python flask flask-sqlalchemy apscheduler

11
推荐指数
2
解决办法
2638
查看次数

在uwsgi应用程序中启动APScheduler最终会为每个工作者安排一个调度程序吗?

我有一个烧瓶应用程序,我需要APScheduler的调度功能.问题是:

我从哪里开始调度程序实例?

我使用uwsgi + nginx为多个工作者提供这个应用程序,我不会最终得到多个彼此无视的调度程序实例吗?如果这是正确的,单个作业将被多次触发,不是吗?

在这种情况下,最好的策略是什么,所以我最终只得到一个Scheduler实例,并且仍然可以从预定的作业中访问应用程序的上下文?

这个问题虽然有枪炮而不是uwsgi,但也有同样的问题,但答案可能类似.

下面是将"app"定义为uwsgi可调用应用程序对象的代码.包含此代码的文件称为wsgi.py(并不重要).

app = create_app(config=ProductionConfig())

def job_listener(event):
    get_ = "msg from job '%s'" % (event.job)
    logging.info(get_)

# This code below never gets invoked when I check with worker_id() == 1
# The only time it is run is with worker_id() value of 0
app.sched = Scheduler()
app.sched.add_jobstore(ShelveJobStore('/tmp/apsched_%d' % uwsgi.worker_id()), 'file')
app.sched.add_listener(job_listener,
                   events.EVENT_JOB_EXECUTED |
                   events.EVENT_JOB_MISSED |
                   events.EVENT_JOB_ERROR)
app.sched.start()
Run Code Online (Sandbox Code Playgroud)

python worker flask uwsgi apscheduler

10
推荐指数
1
解决办法
2736
查看次数

添加 misfire_grace_time 后 APScheduler 丢失作业

我正在运行一个 BlockingScheduler 进程,它应该运行多个cron作业,但每次都无法运行并显示以下消息:

Run time of job "validation (trigger: cron[hour='3'], next run at: 2016-12-30 03:00:00 CST)" was missed by 0:00:02.549821

我有以下设置:

sched = BlockingScheduler(misfire_grace_time=3600, coalesce=True)
sched.add_jobstore('mongodb', collection='my_jobs')

@sched.scheduled_job('cron', hour=3, id='validation')
def validation():
    rep = Myclass()
    rep.run()

if __name__ == '__main__':
    sched.start()
Run Code Online (Sandbox Code Playgroud)

我认为添加misfire_grace_time可以解决问题,但每项工作仍然无法运行。

python cron apscheduler

10
推荐指数
2
解决办法
4542
查看次数

Django 在遥远的未来运行任务(可能)

假设我有一个模型Event。我想在活动结束后向所有受邀用户发送通知(电子邮件、推送等)。类似的东西:

class Event(models.Model):
    start = models.DateTimeField(...)
    end = models.DateTimeField(...)
    invited = models.ManyToManyField(model=User)

    def onEventElapsed(self):
        for user in self.invited:
           my_notification_backend.sendMessage(target=user, message="Event has elapsed")

Run Code Online (Sandbox Code Playgroud)

现在,当然,关键部分是调用onEventElapsedwhen timezone.now() >= event.end。请记住,end距离当前日期可能还有几个月的时间。

我想过两种基本的方法来做到这一点:

  1. 使用定期cron作业(例如,每五分钟左右)检查过去五分钟内是否发生了任何事件并执行我的方法。

  2. 使用celery和调度onEventElapsed使用eta将来要运行的参数(在模型save方法中)。

考虑选项 1,一个潜在的解决方案可能是django-celery-beat。但是,以固定的时间间隔运行任务以发送通知似乎有点奇怪。此外,我提出了一个(潜在的)问题,它(可能)会导致一个不太优雅的解决方案:

  • 每五分钟检查一次过去五分钟内发生的事件?似乎不稳定,也许有些事件被错过了(或者其他人的通知发送了两次?)。潜在的解决方法:向模型添加一个布尔字段,该字段设置为True一旦发送通知。

再说一次,选项 2 也有它的问题:

  • 手动处理事件开始/结束日期时间移动时的情况。使用时celery,必须存储taskID(easy, ofc) 并在日期更改后撤销任务并发出新任务。但我读过,芹菜在处理将来运行的任务时有(特定于设计的)问题:github 上的 Open Issue。我意识到这是如何发生的,以及为什么要解决这一切都是微不足道的。

现在,我遇到了一些可能会解决我的问题的库:

  • celery_longterm_scheduler(但这是否意味着我不能像以前那样使用 celery,因为不同的 Scheduler 类?这也与django-celery-beat...的可能用法有关。使用这两个框架中的任何一个,是否仍然可以排队作业(只是运行时间更长但不是几个月?)
  • django-apscheduler,使用apscheduler. …

python django celery apscheduler django-celery-beat

10
推荐指数
1
解决办法
593
查看次数

Python APScheduler - AsyncIOScheduler 如何工作?

我很难理解AsyncIOScheduler是如何工作的,它是如何非阻塞的?

如果我的工作正在执行阻塞功能,会AsyncIOScheduler阻塞吗?

如果我使用AsyncIOSchedulerwithThreadPoolExecutor呢?这是如何运作的?我可以等待作业执行吗?

python apscheduler

10
推荐指数
3
解决办法
5124
查看次数