我在烧瓶应用程序中使用apscheduler时遇到问题.
在我的view.py文件中,我写的是这样的
import time
from apscheduler.scheduler import Scheduler
def test_scheduler():
print "TEST"
print time.time()
sched = Scheduler()
sched.add_interval_job(test_scheduler, seconds=5)
sched.start()
Run Code Online (Sandbox Code Playgroud)
然后这个方法test_scheduler()每五秒执行两次
测试1360844314.01测试1360844314.2
我正在建立一个向访问者提供一些信息的网站.此信息通过每5秒轮询一对外部API在后台聚合.我现在的工作方式是我使用APScheduler工作.我最初更喜欢APScheduler,因为它使整个系统更容易移植(因为我不需要在新机器上设置cron作业).我启动轮询功能如下:
from apscheduler.scheduler import Scheduler
@app.before_first_request
def initialize():
apsched = Scheduler()
apsched.start()
apsched.add_interval_job(checkFirstAPI, seconds=5)
apsched.add_interval_job(checkSecondAPI, seconds=5)
apsched.add_interval_job(checkThirdAPI, seconds=5)
Run Code Online (Sandbox Code Playgroud)
这种方法有用,但它有一些问题:
RuntimeError('working outside of application context')
).总而言之,我觉得应该有更好的方法来运行这些重复的任务,但我不确定如何.有没有人有这个问题的有趣解决方案?欢迎所有提示!
[编辑]我刚刚阅读了Celery及其时间表.虽然我没有真正看到Celery与APScheduler的区别,以及它是否可以解决我的两点,但我想知道是否有人在阅读这篇文章时认为我应该在Celery中进行更多调查?
[结论]大约两年后,我正在读这篇文章,我想我可以让你们知道我最终得到了什么.我认为@BluePeppers说我不应该与Flask生态系统紧密联系.所以我选择使用Ansible设置的每分钟运行的常规cron-jobs.虽然这使它变得有点复杂(我需要学习Ansible并转换一些代码以便每分钟运行它就足够了)我认为这更加强大.我目前正在使用令人敬畏的pythonr-rq来排队同步作业(检查API和发送电子邮件).我刚刚发现了rq-scheduler.我还没有测试过,但它似乎正是我所需要的.所以这可能是未来读者对这个问题的一个提示.
其余的,我希望你们大家度过美好的一天!
我们有一个用金字塔制作的网络应用程序,并通过gunicorn + nginx提供服务.它适用于8个工作线程/进程
我们需要工作,我们选择了apscheduler.这是我们如何推出它
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from apscheduler.scheduler import Scheduler
rerun_monitor = Scheduler()
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
seconds=JOB_INTERVAL)
Run Code Online (Sandbox Code Playgroud)
问题是gunicorn的所有工作进程都选择了调度程序.我们尝试实现文件锁,但它似乎不是一个足够好的解决方案.什么是最好的方法来确保在任何给定的时间只有一个工作进程选择预定的事件,没有其他线程选择到下一个JOB_INTERVAL
?
如果我们决定稍后切换到apache2 + modwsgi,解决方案甚至需要使用mod_wsgi.它需要与作为服务员的单进程开发服务器一起工作.
我正面临OP所描述的相同问题,只需使用Django应用程序.我最有把握的是,如果原始问题,这个细节不会有太大变化.出于这个原因,为了获得更多的可见性,我还标记了这个问题django
.
我有一个异步功能,需要每隔N分钟使用apscheduller运行一次.下面有一个python代码
URL_LIST = ['<url1>',
'<url2>',
'<url2>',
]
def demo_async(urls):
"""Fetch list of web pages asynchronously."""
loop = asyncio.get_event_loop() # event loop
future = asyncio.ensure_future(fetch_all(urls)) # tasks to do
loop.run_until_complete(future) # loop until done
async def fetch_all(urls):
tasks = [] # dictionary of start times for each url
async with ClientSession() as session:
for url in urls:
task = asyncio.ensure_future(fetch(url, session))
tasks.append(task) # create list of tasks
_ = await asyncio.gather(*tasks) # gather task responses
async def fetch(url, session):
"""Fetch a …
Run Code Online (Sandbox Code Playgroud) 我正在使用Python apscheduler(版本3.0.1)每秒执行一个函数
代码:
scheduler = BackgroundScheduler()
scheduler.add_job(runsync, 'interval', seconds=1)
scheduler.start()
Run Code Online (Sandbox Code Playgroud)
它大部分时间都很好,但有时我会收到这个警告:
WARNING:apscheduler.scheduler:Execution of job "runsync (trigger: interval[0:00:01], next run at: 2015-12-01 11:50:42 UTC)" skipped: maximum number of running instances reached (1)
Run Code Online (Sandbox Code Playgroud)
1.这是执行此方法的正确方法吗?
这个警告意味着什么?它是否会影响函数内部的任务执行?
3.如何处理这个?
我想用Flask-APScheduler运行一个查询Flask-SQLAlchemy模型的工作.当工作开始时,我明白了RuntimeError: application not registered on db instance and no application bound to current context
.如何运行查询数据库的作业.
from flask_apscheduler import APScheduler
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
Run Code Online (Sandbox Code Playgroud)
from models import User
def my_job():
user = User.query.first()
print(user)
Run Code Online (Sandbox Code Playgroud)
查询期间发生错误,然后才能打印.数据库正在其他应用程序中工作以进行其他查询.
我试图with app.app_context():
在设置扩展时添加,但这不起作用.
with app.app_context()
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
Run Code Online (Sandbox Code Playgroud)
完整的追溯是:
ERROR:apscheduler.executors.default:Job "run_in (trigger: interval[0:00:10], next run at: 2016-10-18 23:00:53 CEST)" raised an exception
Traceback (most recent call last):
File "/Users/user/.virtualenvs/myfolder/lib/python2.7/site-packages/apscheduler/executors/base.py", line 125, in run_job
retval = job.func(*job.args, **job.kwargs)
File "/Users/user/Documents/myfolder/myfolder/myfile.py", line 19, …
Run Code Online (Sandbox Code Playgroud) 我有一个烧瓶应用程序,我需要APScheduler的调度功能.问题是:
我从哪里开始调度程序实例?
我使用uwsgi + nginx为多个工作者提供这个应用程序,我不会最终得到多个彼此无视的调度程序实例吗?如果这是正确的,单个作业将被多次触发,不是吗?
在这种情况下,最好的策略是什么,所以我最终只得到一个Scheduler实例,并且仍然可以从预定的作业中访问应用程序的上下文?
这个问题虽然有枪炮而不是uwsgi,但也有同样的问题,但答案可能类似.
下面是将"app"定义为uwsgi可调用应用程序对象的代码.包含此代码的文件称为wsgi.py(并不重要).
app = create_app(config=ProductionConfig())
def job_listener(event):
get_ = "msg from job '%s'" % (event.job)
logging.info(get_)
# This code below never gets invoked when I check with worker_id() == 1
# The only time it is run is with worker_id() value of 0
app.sched = Scheduler()
app.sched.add_jobstore(ShelveJobStore('/tmp/apsched_%d' % uwsgi.worker_id()), 'file')
app.sched.add_listener(job_listener,
events.EVENT_JOB_EXECUTED |
events.EVENT_JOB_MISSED |
events.EVENT_JOB_ERROR)
app.sched.start()
Run Code Online (Sandbox Code Playgroud) 我正在运行一个 BlockingScheduler 进程,它应该运行多个cron
作业,但每次都无法运行并显示以下消息:
Run time of job "validation (trigger: cron[hour='3'], next run at: 2016-12-30 03:00:00 CST)" was missed by 0:00:02.549821
我有以下设置:
sched = BlockingScheduler(misfire_grace_time=3600, coalesce=True)
sched.add_jobstore('mongodb', collection='my_jobs')
@sched.scheduled_job('cron', hour=3, id='validation')
def validation():
rep = Myclass()
rep.run()
if __name__ == '__main__':
sched.start()
Run Code Online (Sandbox Code Playgroud)
我认为添加misfire_grace_time
可以解决问题,但每项工作仍然无法运行。
假设我有一个模型Event
。我想在活动结束后向所有受邀用户发送通知(电子邮件、推送等)。类似的东西:
class Event(models.Model):
start = models.DateTimeField(...)
end = models.DateTimeField(...)
invited = models.ManyToManyField(model=User)
def onEventElapsed(self):
for user in self.invited:
my_notification_backend.sendMessage(target=user, message="Event has elapsed")
Run Code Online (Sandbox Code Playgroud)
现在,当然,关键部分是调用onEventElapsed
when timezone.now() >= event.end
。请记住,end
距离当前日期可能还有几个月的时间。
我想过两种基本的方法来做到这一点:
使用定期cron
作业(例如,每五分钟左右)检查过去五分钟内是否发生了任何事件并执行我的方法。
使用celery
和调度onEventElapsed
使用eta
将来要运行的参数(在模型save
方法中)。
考虑选项 1,一个潜在的解决方案可能是django-celery-beat
。但是,以固定的时间间隔运行任务以发送通知似乎有点奇怪。此外,我提出了一个(潜在的)问题,它(可能)会导致一个不太优雅的解决方案:
True
一旦发送通知。再说一次,选项 2 也有它的问题:
celery
,必须存储taskID
(easy, ofc) 并在日期更改后撤销任务并发出新任务。但我读过,芹菜在处理将来运行的任务时有(特定于设计的)问题:github 上的 Open Issue。我意识到这是如何发生的,以及为什么要解决这一切都是微不足道的。现在,我遇到了一些可能会解决我的问题的库:
django-celery-beat
...的可能用法有关。使用这两个框架中的任何一个,是否仍然可以排队作业(只是运行时间更长但不是几个月?)apscheduler
. …我很难理解AsyncIOScheduler是如何工作的,它是如何非阻塞的?
如果我的工作正在执行阻塞功能,会AsyncIOScheduler
阻塞吗?
如果我使用AsyncIOScheduler
withThreadPoolExecutor
呢?这是如何运作的?我可以等待作业执行吗?