如何使用APScheduler安排间隔作业?

Tro*_*roy 5 python apscheduler

我正在尝试使用APScheduler(v3.0.0)安排间隔作业。

我试过了:

from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()

def my_interval_job():
    print 'Hello World!'
sched.add_job(my_interval_job, 'interval', seconds=5)
sched.start()
Run Code Online (Sandbox Code Playgroud)

from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()

@sched.scheduled_job('interval', id='my_job_id', seconds=5)
def my_interval_job():
    print 'Hello World!'
sched.start()
Run Code Online (Sandbox Code Playgroud)

要么按照文档工作,但工作永远不会解雇...


更新:
事实证明还有其他与环境相关的问题,导致任务无法运行。今天早上,此任务运行良好,而昨天起未对代码进行任何修改。


更新2:
经过进一步测试,我发现“间隔”作业通常看起来很不稳定...上面的代码现在可以在我的开发环境中使用,但是当我部署到临时环境时不起作用(我正在使用heroku应用程序)。我还有其他apscheduler的“ cron”工作,它们在舞台环境/生产环境中都可以正常工作。

当我为“ apscheduler.schedulers”记录器打开调试日志记录时,该日志指示添加了间隔作业:

添加的作业“my_cron_job1”求职商店“默认”
增加就业“my_cron_job2”求职商店“默认”
增加就业“my_interval_job”求职商店“默认”
计划开始
试探性地增加就业-它会得到妥善安排,当调度开始
增加暂定作业-计划程序启动时将正确地安排它以
查找要运行的作业
下一次唤醒应于2015-03-24 15:05:00-07:00(在254.210542秒中

将间隔作业设置为5秒后,下一次唤醒应如何从现在开始254秒??

小智 7

您需要保持线程处于活动状态。这是我如何使用它的示例。

from subprocess import call

import time
import os

from apscheduler.schedulers.background import BackgroundScheduler


def job():
    print("In job")
    call(['python', 'scheduler/main.py'])


if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    scheduler.configure(timezone=utc)
    scheduler.add_job(job, 'interval', seconds=10)
    scheduler.add
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        # This is here to simulate application activity (which keeps the main thread alive).
        while True:
            time.sleep(5)
    except (KeyboardInterrupt, SystemExit):
        # Not strictly necessary if daemonic mode is enabled but should be done if possible
        scheduler.shutdown()
Run Code Online (Sandbox Code Playgroud)


Tro*_*roy 5

我还没有弄清楚是什么导致了最初的问题,但我通过交换作业安排的顺序来解决这个问题,以便在“cron”作业之前安排“间隔”作业。

即我从这个切换:

def my_cron_job1():
    print "cron job 1"

def my_cron_job2():
    print "cron job 2"

def my_interval_job():
    print "interval job"

if __name__ == '__main__':
    from apscheduler.schedulers.blocking import BlockingScheduler
    sched = BlockingScheduler(timezone='MST')

    sched.add_job(my_cron_job1, 'cron', id='my_cron_job1', minute=10)
    sched.add_job(my_cron_job2, 'cron', id='my_cron_job2', minute=20)

    sched.add_job(my_interval_job, 'interval', id='my_job_id', seconds=5)
Run Code Online (Sandbox Code Playgroud)

对此:

def my_cron_job1():
    print "cron job 1"

def my_cron_job2():
    print "cron job 2"

def my_interval_job():
    print "interval job"

if __name__ == '__main__':
    from apscheduler.schedulers.blocking import BlockingScheduler
    sched = BlockingScheduler(timezone='MST')

    sched.add_job(my_interval_job, 'interval', id='my_job_id', seconds=5)

    sched.add_job(my_cron_job1, 'cron', id='my_cron_job1', minute=10)
    sched.add_job(my_cron_job2, 'cron', id='my_cron_job2', minute=20)
Run Code Online (Sandbox Code Playgroud)

现在,cron 作业和间隔作业在两种环境中都可以正常运行。

  • 我建议不要指定那样的时区。我只是帮助解决了开发人员指定了 EST 时区的夏令时问题。切换到“美国/纽约”反而解决了这个问题。总是更喜欢基于语言环境的时区名称,因为它们永远不会有歧义。 (2认同)

Ale*_*olm 0

该文档有一个错误。我现在已经修好了。第一行应该是:

from apscheduler.schedulers.blocking import BlockingScheduler
Run Code Online (Sandbox Code Playgroud)

虽然它会引发 ImportError,但您没有提及任何错误。您尝试过所提供的示例吗?