我正在开发一个Django应用程序.我有一个API端点,如果需要,它必须执行一个必须重复几次的函数(直到某个条件为真).我现在如何处理它是 -
def shut_down(request):
# Do some stuff
while True:
result = some_fn()
if result:
break
time.sleep(2)
return True
Run Code Online (Sandbox Code Playgroud)
虽然我知道这是一个可怕的方法,我不应该阻止2秒,我无法弄清楚如何解决它.
在等了4秒之后,这有效.但我想要让循环在后台运行的东西,并在some_fn返回True时停止.(另外,some_fn肯定会返回True)
编辑 -
阅读Oz123的回复给了我一个似乎有效的想法.这就是我做的 -
def shut_down(params):
# Do some stuff
# Offload the blocking job to a new thread
t = threading.Thread(target=some_fn, args=(id, ), kwargs={})
t.setDaemon(True)
t.start()
return True
def some_fn(id):
while True:
# Do the job, get result in res
# If the job is done, return. Or sleep the thread for 2 seconds before trying again.
if res:
return
else:
time.sleep(2)
Run Code Online (Sandbox Code Playgroud)
这对我来说很重要.这很简单,但我不知道多线程与Django的结合效率如何.
如果有人能指出这方面的陷阱,那么批评就会受到赞赏.
Oz1*_*123 17
对于许多小项目来说,芹菜是矫枉过正的.对于那些您可以使用计划的项目,它非常容易使用.
使用此库,您可以使任何函数定期执行任务:
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
while True:
schedule.run_pending()
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)
该示例以阻塞方式运行,但是如果查看常见问题解答,您会发现您还可以在并行线程中运行任务,这样您就不会阻塞,并且一旦不再需要就删除该任务:
from schedule import Scheduler
def run_continuously(self, interval=1):
"""Continuously run, while executing pending jobs at each elapsed
time interval.
@return cease_continuous_run: threading.Event which can be set to
cease continuous run.
Please note that it is *intended behavior that run_continuously()
does not run missed jobs*. For example, if you've registered a job
that should run every minute and you set a continuous run interval
of one hour then your job won't be run 60 times at each interval but
only once.
"""
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
while not cease_continuous_run.is_set():
self.run_pending()
time.sleep(interval)
continuous_thread = ScheduleThread()
continuous_thread.setDaemon(True)
continuous_thread.start()
return cease_continuous_run
Scheduler.run_continuously = run_continuously
Run Code Online (Sandbox Code Playgroud)
以下是类方法中的用法示例:
def foo(self):
...
if some_condition():
return schedule.CancelJob # a job can dequeue it
# can be put in __enter__ or __init__
self._job_stop = self.scheduler.run_continuously()
logger.debug("doing foo"...)
self.foo() # call foo
self.scheduler.every(5).seconds.do(
self.foo) # schedule foo for running every 5 seconds
...
# later on foo is not needed any more:
self._job_stop.set()
...
def __exit__(self, exec_type, exc_value, traceback):
# if the jobs are not stop, you can stop them
self._job_stop.set()
Run Code Online (Sandbox Code Playgroud)
Tan*_*ett 11
这个答案稍微扩展了Oz123 的答案。
为了让事情正常工作,我创建了一个文件,称为mainapp/jobs.py包含我的预定作业。然后,在我的apps.py模块中,我放入from . import jobs了ready方法。这是我的整个apps.py文件:
from django.apps import AppConfig
import os
class MainappConfig(AppConfig):
name = 'mainapp'
def ready(self):
from . import jobs
if os.environ.get('RUN_MAIN', None) != 'true':
jobs.start_scheduler()
Run Code Online (Sandbox Code Playgroud)
(RUN_MAIN检查是因为python manage.py runserver该ready方法运行两次 -在两个进程中的每一个中运行一次 - 但我们只想运行一次。)
现在,这是我放在我的jobs.py文件中的内容。第一,进口。你需要进口Scheduler,threading且time如下。在F和UserHolding进口都只是我的工作做什么; 你不会导入这些。
from django.db.models import F
from schedule import Scheduler
import threading
import time
from .models import UserHolding
Run Code Online (Sandbox Code Playgroud)
接下来,编写要调度的函数。以下纯粹是一个例子;你的函数不会像这样。
def give_admin_gold():
admin_gold_holding = (UserHolding.objects
.filter(inventory__user__username='admin', commodity__name='gold'))
admin_gold_holding.update(amount=F('amount') + 1)
Run Code Online (Sandbox Code Playgroud)
接下来,schedule通过run_continuously向其Scheduler类添加方法来修补模块。使用以下代码执行此操作,该代码是从 Oz123 的答案中逐字复制的。
def run_continuously(self, interval=1):
"""Continuously run, while executing pending jobs at each elapsed
time interval.
@return cease_continuous_run: threading.Event which can be set to
cease continuous run.
Please note that it is *intended behavior that run_continuously()
does not run missed jobs*. For example, if you've registered a job
that should run every minute and you set a continuous run interval
of one hour then your job won't be run 60 times at each interval but
only once.
"""
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
while not cease_continuous_run.is_set():
self.run_pending()
time.sleep(interval)
continuous_thread = ScheduleThread()
continuous_thread.setDaemon(True)
continuous_thread.start()
return cease_continuous_run
Scheduler.run_continuously = run_continuously
Run Code Online (Sandbox Code Playgroud)
最后,定义一个函数来创建一个Scheduler对象,连接你的工作,并调用调度程序的run_continuously方法。
def start_scheduler():
scheduler = Scheduler()
scheduler.every().second.do(give_admin_gold)
scheduler.run_continuously()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
9974 次 |
| 最近记录: |