如果我有一个定义如下的函数:
def add(x,y):
return x+y
Run Code Online (Sandbox Code Playgroud)
有没有办法动态添加这个函数作为芹菜PeriodicTask并在运行时启动它?我希望能够做类似(伪代码)的事情:
some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30"))
celery.beat.start(some_unique_task_id)
Run Code Online (Sandbox Code Playgroud)
我还想用类似(伪代码)的东西动态地停止或删除该任务:
celery.beat.remove_task(some_unique_task_id)
Run Code Online (Sandbox Code Playgroud)
要么
celery.beat.stop(some_unique_task_id)
Run Code Online (Sandbox Code Playgroud)
仅供参考我没有使用djcelery,它允许您通过django管理员管理定期任务.
我在烧瓶应用程序中使用apscheduler时遇到问题.
在我的view.py文件中,我写的是这样的
import time
from apscheduler.scheduler import Scheduler
def test_scheduler():
print "TEST"
print time.time()
sched = Scheduler()
sched.add_interval_job(test_scheduler, seconds=5)
sched.start()
Run Code Online (Sandbox Code Playgroud)
然后这个方法test_scheduler()每五秒执行两次
测试1360844314.01测试1360844314.2
我有一个烧瓶应用程序,我需要APScheduler的调度功能.问题是:
我从哪里开始调度程序实例?
我使用uwsgi + nginx为多个工作者提供这个应用程序,我不会最终得到多个彼此无视的调度程序实例吗?如果这是正确的,单个作业将被多次触发,不是吗?
在这种情况下,最好的策略是什么,所以我最终只得到一个Scheduler实例,并且仍然可以从预定的作业中访问应用程序的上下文?
这个问题虽然有枪炮而不是uwsgi,但也有同样的问题,但答案可能类似.
下面是将"app"定义为uwsgi可调用应用程序对象的代码.包含此代码的文件称为wsgi.py(并不重要).
app = create_app(config=ProductionConfig())
def job_listener(event):
get_ = "msg from job '%s'" % (event.job)
logging.info(get_)
# This code below never gets invoked when I check with worker_id() == 1
# The only time it is run is with worker_id() value of 0
app.sched = Scheduler()
app.sched.add_jobstore(ShelveJobStore('/tmp/apsched_%d' % uwsgi.worker_id()), 'file')
app.sched.add_listener(job_listener,
events.EVENT_JOB_EXECUTED |
events.EVENT_JOB_MISSED |
events.EVENT_JOB_ERROR)
app.sched.start()
Run Code Online (Sandbox Code Playgroud) 我有一个使用 uwsgi(有 10 个工人)+ ngnix 运行的 django 应用程序。我正在使用 apscheduler 进行调度。每当我安排一项工作时,它都会被多次执行。从这些答案ans1,ans2我了解到这是因为调度程序是在uwsgi 的每个工作人员中启动的。我通过按照本答案中的建议将调度程序绑定到套接字并在数据库中保持状态来对调度程序进行有条件的初始化,以便仅启动一个调度程序实例,但仍然存在相同的问题,有时在创建时也存在发现调度程序未运行的作业并且该作业保持挂起且未执行。
我正在使用以下代码在 django 应用程序的 url 中初始化 apscheduler。这将在应用程序启动时启动调度程序。
def job_listener(ev):
print('event',ev)
job_defaults = {
'coalesce': True,
'max_instances': 1
}
scheduler = BackgroundScheduler(job_defaults=job_defaults, timezone=TIME_ZONE, daemon=False)
scheduler.add_jobstore(MongoDBJobStore(client=client), 'default')
scheduler.add_executor(ThreadPoolExecutor(), 'default')
scheduler.add_executor(ProcessPoolExecutor(),'processpool')
scheduler.add_listener(job_listener)
def initialize_scheduler():
try:
if scheduler_db_conn.find_one():
print('scheduler already running')
return True
scheduler.start()
scheduler_db_conn.save({'status': True})
print('---------------scheduler started --------------->')
return True
except:
return False
Run Code Online (Sandbox Code Playgroud)
我使用以下代码来创建作业。
from scheduler_conf import scheduler
def create_job(arg_list):
try:
print('scheduler status-->',scheduler.running)
job = …Run Code Online (Sandbox Code Playgroud)