sun*_*tty 48 python cron scheduled-tasks cron-task python-3.x
在我问之前,Cron Jobs和Task Scheduler将是我的最后选择,这个脚本将在Windows和Linux之间使用,我更愿意使用编码方法来完成此操作,而不是将其留给最终用户完成.
是否有可用于安排任务的Python库?我需要每小时运行一次函数,但是,如果我每小时运行一次脚本并使用.sleep,"每小时一次"将在前一天的不同时间运行,因为延迟执行/运行脚本和/或功能所固有的.
在不使用Cron作业或使用任务计划程序进行计划的情况下,安排函数在一天中的特定时间(不止一次)运行的最佳方法是什么?
或者,如果这是不可能的,我也希望你的意见.
import datetime
import time
from apscheduler.scheduler import Scheduler
# Start the scheduler
sched = Scheduler()
sched.daemonic = False
sched.start()
def job_function():
print("Hello World")
print(datetime.datetime.now())
time.sleep(20)
# Schedules job_function to be run once each minute
sched.add_cron_job(job_function, minute='0-59')
Run Code Online (Sandbox Code Playgroud)
出:
>Hello World
>2014-03-28 09:44:00.016.492
>Hello World
>2014-03-28 09:45:00.0.14110
Run Code Online (Sandbox Code Playgroud)
(来自Animesh Pandey的答案如下)
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
@sched.scheduled_job('interval', seconds=10)
def timed_job():
print('This job is run every 10 seconds.')
@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
print('This job is run every weekday at 10am.')
sched.configure(options_from_ini_file)
sched.start()
Run Code Online (Sandbox Code Playgroud)
Unk*_*own 52
也许这可以帮助:高级Python调度程序
这是他们文档中的一小段代码:
from apscheduler.schedulers.blocking import BlockingScheduler
def some_job():
print "Decorated job"
scheduler = BlockingScheduler()
scheduler.add_job(some_job, 'interval', hours=1)
scheduler.start()
Run Code Online (Sandbox Code Playgroud)
Sha*_*ies 23
每小时每10分钟运行一次.
from datetime import datetime, timedelta
while 1:
print 'Run something..'
dt = datetime.now() + timedelta(hours=1)
dt = dt.replace(minute=10)
while datetime.now() < dt:
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)
Ani*_*dey 18
对于apscheduler<3.0,请参阅未知的答案.
对于apscheduler> 3.0
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
@sched.scheduled_job('interval', seconds=10)
def timed_job():
print('This job is run every 10 seconds.')
@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
print('This job is run every weekday at 10am.')
sched.configure(options_from_ini_file)
sched.start()
Run Code Online (Sandbox Code Playgroud)
apscheduler 文件.
这对apscheduler-3.3.1上Python 3.6.2.
"""
Following configurations are set for the scheduler:
- a MongoDBJobStore named “mongo”
- an SQLAlchemyJobStore named “default” (using SQLite)
- a ThreadPoolExecutor named “default”, with a worker count of 20
- a ProcessPoolExecutor named “processpool”, with a worker count of 5
- UTC as the scheduler’s timezone
- coalescing turned off for new jobs by default
- a default maximum instance limit of 3 for new jobs
"""
from pytz import utc
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
"""
Method 1:
"""
jobstores = {
'mongo': {'type': 'mongodb'},
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 20},
'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
"""
Method 2 (ini format):
"""
gconfig = {
'apscheduler.jobstores.mongo': {
'type': 'mongodb'
},
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'sqlite:///jobs.sqlite'
},
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '5'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '3',
'apscheduler.timezone': 'UTC',
}
sched_method1 = BlockingScheduler() # uses overrides from Method1
sched_method2 = BlockingScheduler() # uses same overrides from Method2 but in an ini format
@sched_method1.scheduled_job('interval', seconds=10)
def timed_job():
print('This job is run every 10 seconds.')
@sched_method2.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
print('This job is run every weekday at 10am.')
sched_method1.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
sched_method1.start()
sched_method2.configure(gconfig=gconfig)
sched_method2.start()
Run Code Online (Sandbox Code Playgroud)
小智 9
我可以建议的最简单的选择是使用时间表库。
在您的问题中,您说“我需要每小时运行一次函数”执行此操作的代码非常简单:
import schedule
def thing_you_wanna_do():
...
...
return
schedule.every().hour.do(thing_you_wanna_do)
while True:
schedule.run_pending()
Run Code Online (Sandbox Code Playgroud)
你还询问了如何在一天中的某个时间做某事,一些如何做到这一点的例子是:
import schedule
def thing_you_wanna_do():
...
...
return
schedule.every().day.at("10:30").do(thing_you_wanna_do)
schedule.every().monday.do(thing_you_wanna_do)
schedule.every().wednesday.at("13:15").do(thing_you_wanna_do)
# If you would like some randomness / variation you could also do something like this
schedule.every(1).to(2).hours.do(thing_you_wanna_do)
while True:
schedule.run_pending()
Run Code Online (Sandbox Code Playgroud)
90% 使用的代码是调度库的示例代码 。快乐调度!