调度Python脚本以准确地每小时运行一次

sun*_*tty 48 python cron scheduled-tasks cron-task python-3.x

在我问之前,Cron Jobs和Task Scheduler将是我的最后选择,这个脚本将在Windows和Linux之间使用,我更愿意使用编码方法来完成此操作,而不是将其留给最终用户完成.

是否有可用于安排任务的Python库?我需要每小时运行一次函数,但是,如果我每小时运行一次脚本并使用.sleep,"每小时一次"将在前一天的不同时间运行,因为延迟执行/运行脚本和/或功能所固有的.

使用Cron作业或使用任务计划程序进行计划的情况下,安排函数在一天中的特定时间(不止一次)运行的最佳方法是什么?

或者,如果这是不可能的,我也希望你的意见.

AP Scheduler完全符合我的需求.

版本<3.0

import datetime
import time
from apscheduler.scheduler import Scheduler

# Start the scheduler
sched = Scheduler()
sched.daemonic = False
sched.start()

def job_function():
    print("Hello World")
    print(datetime.datetime.now())
    time.sleep(20)

# Schedules job_function to be run once each minute
sched.add_cron_job(job_function,  minute='0-59')
Run Code Online (Sandbox Code Playgroud)

出:

>Hello World
>2014-03-28 09:44:00.016.492
>Hello World
>2014-03-28 09:45:00.0.14110
Run Code Online (Sandbox Code Playgroud)

版本> 3.0

(来自Animesh Pandey的答案如下)

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

@sched.scheduled_job('interval', seconds=10)
def timed_job():
    print('This job is run every 10 seconds.')

@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
    print('This job is run every weekday at 10am.')

sched.configure(options_from_ini_file)
sched.start()
Run Code Online (Sandbox Code Playgroud)

Unk*_*own 52

也许这可以帮助:高级Python调度程序

这是他们文档中的一小段代码:

from apscheduler.schedulers.blocking import BlockingScheduler

def some_job():
    print "Decorated job"

scheduler = BlockingScheduler()
scheduler.add_job(some_job, 'interval', hours=1)
scheduler.start()
Run Code Online (Sandbox Code Playgroud)

  • 这适用于apscheduler版本<3.x有人可以给我们这样一个简单的例子但是有3.x版本吗? (4认同)
  • 这是我的问题的答案,它看起来很简单,易于使用.我在回复中添加了一些测试代码.再次感谢. (3认同)
  • 此语法不再是最新的 (3认同)
  • 好的,所以调度程序在我从 DOS 运行命令 `python test_scheduler.py` 后启动,那么我如何停止它?只能按Ctrl+C?有没有办法让调度程序在运行 100 次后停止?或者在特定时间说 18:00?你如何在脚本中指定它。我搜索但发现没有人提到如何阻止它。 (2认同)

Sha*_*ies 23

每小时每10分钟运行一次.

from datetime import datetime, timedelta

while 1:
    print 'Run something..'

    dt = datetime.now() + timedelta(hours=1)
    dt = dt.replace(minute=10)

    while datetime.now() < dt:
        time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

  • 这是基于 apscheduler 的代码的替代方案。适用于树莓派。 (3认同)

Ani*_*dey 18

对于apscheduler<3.0,请参阅未知的答案.

对于apscheduler> 3.0

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

@sched.scheduled_job('interval', seconds=10)
def timed_job():
    print('This job is run every 10 seconds.')

@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
    print('This job is run every weekday at 10am.')

sched.configure(options_from_ini_file)
sched.start()
Run Code Online (Sandbox Code Playgroud)

更新:

apscheduler 文件.

这对apscheduler-3.3.1Python 3.6.2.

"""
Following configurations are set for the scheduler:

 - a MongoDBJobStore named “mongo”
 - an SQLAlchemyJobStore named “default” (using SQLite)
 - a ThreadPoolExecutor named “default”, with a worker count of 20
 - a ProcessPoolExecutor named “processpool”, with a worker count of 5
 - UTC as the scheduler’s timezone
 - coalescing turned off for new jobs by default
 - a default maximum instance limit of 3 for new jobs
"""

from pytz import utc
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor

"""
Method 1:
"""
jobstores = {
    'mongo': {'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}

"""
Method 2 (ini format):
"""
gconfig = {
    'apscheduler.jobstores.mongo': {
        'type': 'mongodb'
    },
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy',
        'url': 'sqlite:///jobs.sqlite'
    },
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '5'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'UTC',
}

sched_method1 = BlockingScheduler() # uses overrides from Method1
sched_method2 = BlockingScheduler() # uses same overrides from Method2 but in an ini format


@sched_method1.scheduled_job('interval', seconds=10)
def timed_job():
    print('This job is run every 10 seconds.')


@sched_method2.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
    print('This job is run every weekday at 10am.')


sched_method1.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
sched_method1.start()

sched_method2.configure(gconfig=gconfig)
sched_method2.start()
Run Code Online (Sandbox Code Playgroud)

  • 这条线是什么意思?`sched.configure(options_from_ini_file)`什么是ini_file,你如何配置它? (4认同)

小智 9

我可以建议的最简单的选择是使用时间表库。

在您的问题中,您说“我需要每小时运行一次函数”执行此操作的代码非常简单:

    import schedule

    def thing_you_wanna_do():
        ...
        ...
        return


    schedule.every().hour.do(thing_you_wanna_do)

    while True:
        schedule.run_pending()
Run Code Online (Sandbox Code Playgroud)

你还询问了如何在一天中的某个时间做某事,一些如何做到这一点的例子是:

    import schedule


    def thing_you_wanna_do():
        ...
        ...
        return


    schedule.every().day.at("10:30").do(thing_you_wanna_do)
    schedule.every().monday.do(thing_you_wanna_do)
    schedule.every().wednesday.at("13:15").do(thing_you_wanna_do)
    # If you would like some randomness / variation you could also do something like this
    schedule.every(1).to(2).hours.do(thing_you_wanna_do)

    while True:
        schedule.run_pending()
Run Code Online (Sandbox Code Playgroud)

90% 使用的代码是调度库的示例代码 。快乐调度!