我在Flask应用程序中有一个APScheduler,每隔一段时间发送一次事件.
现在我需要"刷新"所有作业,实际上如果它们在没有触及定义的间隔的情况下不运行就会立即启动它们.
我试着调用job.pause()然后调用job.resume(),没有任何东西,并使用job.reschedule_job(...)会触发它,但也会改变我不想要的间隔...
我的实际代码如下:
cron = GeventScheduler(daemon=True)
# Explicitly kick off the background thread
cron.start()
cron.add_job(_job_time, 'interval', seconds=5, id='_job_time')
cron.add_job(_job_forecast, 'interval', hours=1, id='_job_forecast_01')
@app.route("/refresh")
def refresh():
refreshed = []
for job in cron.get_jobs():
job.pause()
job.resume()
refreshed.append(job.id)
return json.dumps( {'number': len(cron.get_jobs()), 'list': refreshed} )
Run Code Online (Sandbox Code Playgroud) 我正在电报中编写一个Python机器人,但我有一个错误未解决,错误在时间表中
Traceback (most recent call last):
File "C:\Users\vini6\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\apscheduler\schedulers\base.py", line 979, in _process_jobs
executor.submit_job(job, run_times)
File "C:\Users\vini6\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\apscheduler\executors\base.py", line 71, in submit_job
self._do_submit_job(job, run_times)
File "C:\Users\vini6\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\apscheduler\executors\pool.py", line 28, in _do_submit_job
f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.9_3.9.2544.0_x64__qbz5n2kfra8p0\lib\concurrent\futures\thread.py", line 169, in submit
raise RuntimeError('cannot schedule new futures after '
RuntimeError: cannot schedule new futures after interpreter shutdown
Run Code Online (Sandbox Code Playgroud) 我有一个简单的要求.我正在运行apscheduler作为一个单独的进程.我有另一个jobproducer脚本,我希望将作业添加到调度程序并运行它.
这是我的调度程序代码,
# appsched.py
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.start()
Run Code Online (Sandbox Code Playgroud)
这是我的作业制作人脚本,
# jobproducer.py
from appsched import scheduler
def say_hello_job():
print "Hello"
scheduler.add_job(say_hello_job, 'interval', minutes=1)
Run Code Online (Sandbox Code Playgroud)
不用说,这不起作用.有没有办法通过使用jobstore来完成这项工作?如何与多个不同的职位生产者共享调度程序?
我用“max_instances = 10”设置调度程序。可以同时运行10个作业。有时有些作业被阻塞,它会挂在那里。当超过10个作业在那里阻塞时,“跳过:运行的最大数量”的例外实例达到(10)”。APScheduler 是否有办法设置作业持续时间的最长时间。如果作业运行超过最长时间,它将被终止。如果没有办法,我该怎么办?
问候!
我正在尝试使用PyInstaller进行构建.配置:Python 3.6.5
pip 10.0.1
,操作系统:Ubuntu 18.04
.使用virtualenv
(也试过python -m venv
).
我的应用程序使用apscheduler
,websocket
,_thread
和它看起来像一些相关的模块有进口的问题.试过pyinstaller --onefile mymain.spec
&pyinstaller --onedir mymain.spec
.两种情况都存在问题.如果没有冻结,程序可以正常工作.
这是我尝试运行生成的可执行文件时得到的错误:
Traceback (most recent call last):
File "apscheduler/schedulers/base.py", line 882, in _create_plugin_instance
KeyError: 'interval'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "cmonitorcli/services/socket_client.py", line 70, in run
File "cmonitorcli/services/scheduler.py", line 36, in add_update_job
File "apscheduler/schedulers/base.py", line 413, in add_job
File …
Run Code Online (Sandbox Code Playgroud) 启动调度程序会出现此错误.正如我在提交中看到的那样,这段代码仅在3天前添加.所以我在这里遗漏了一些东西或它的错误?
In /local/lib/python2.7/site-packages/apscheduler/__init__.py in <module>()
1) # These will be removed in APScheduler 4.0.
2) **parsed_version =__import__('pkg_resources').get_distribution('APScheduler').parsed_version
3) version_info =
tuple(int(x) if x.isdigit() else x for x in
parsed_version.public.split('.'))**
4) version = parsed_version.base_version
5) release = __version__ = parsed_version.public
**AttributeError: 'tuple' object has no attribute 'public'**
Run Code Online (Sandbox Code Playgroud) 我正在尝试创建一个可以按以下 cron 计划运行作业的进程,0/5 8-17 * * 1-5
这是我的测试代码:
import argparse
from apscheduler.schedulers.background import BackgroundScheduler
import datetime
import time
cmdline_parser = argparse.ArgumentParser(description='Testing')
cmdline_parser.add_argument('--interval', type=int, default=5)
def task():
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
print(f'{now}')
if __name__=='__main__':
args = cmdline_parser.parse_args()
sched = BackgroundScheduler(timezone='EST')
sched.start()
minutes_interval = f'0/{args.interval}'
sched.add_job(task, trigger='cron', day_of_week='mon-fri', hour='8-17', minute=minutes_interval)
while True:
time.sleep(30)
Run Code Online (Sandbox Code Playgroud)
但它不会在下午 5 点后停止。如果我错误地使用了 cron 参数,请帮忙。
我试图每天在精确的时间运行一些东西。我尝试过 Schedule 并使用了 1 秒的睡眠时间,但有时它会运行两次,所以我想切换到 APScheduler。但我以前从未使用过任何类似 Cron 的东西,而且他们网页上的“用户指南”根本不像详细的文档,所以我不知道从哪里开始。谷歌搜索但只有间隔的答案。我想指定一个时间而不是间隔。
例如,每天 03:00:05 运行一个函数。你会怎么做?
谢谢。
我正在尝试使用 asyncioscheduler 计划作业运行 python 异步应用程序,但 APScheduler 在构建过程中由于以下错误而失败:
“仅支持 pytz 库中的时区”错误
我确实在我的应用程序中包含 pytz 并且我正在传递时区。是什么导致了错误?
我在创建作业管理器的类中调用 asyncioscheduler:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
class ScheduleManager:
def __init__(self) -> None:
self.scheduler = AsyncIOScheduler()
def start(self):
self.scheduler.start()
def stop(self):
self.scheduler.shutdown()
def add_seconds_interval_job(self, callback, interval : int):
self.scheduler.add_job(callback, 'interval', seconds = interval)
def add_minutes_interval_job(self, callback, interval : int):
self.scheduler.add_job(callback, 'interval', minutes = interval)
def add_hours_interval_job(self, callback, interval : int):
self.scheduler.add_job(callback, 'interval', hours = interval)
def add_days_interval_job(self, callback, interval : int):
self.scheduler.add_job(callback, 'interval', days = interval)
Run Code Online (Sandbox Code Playgroud)
然后我从我的应用程序中调用该经理,如下所示:
from …
Run Code Online (Sandbox Code Playgroud) from fastapi import FastAPI\nfrom fastapi.middleware.cors import CORSMiddleware\nimport uvicorn\nimport time\nfrom loguru import logger\nfrom apscheduler.schedulers.background import BackgroundScheduler\n\napp = FastAPI()\napp.add_middleware(\n CORSMiddleware,\n allow_origins=["*"],\n allow_credentials=True,\n allow_methods=["*"],\n allow_headers=["*"],\n)\n\ntest_list = ["1"]*10\n\ndef check_list_len():\n global test_list\n while True:\n time.sleep(5)\n logger.info(f"check_list_len\xef\xbc\x9a{len(test_list)}")\n\n@app.on_event('startup')\ndef init_data():\n scheduler = BackgroundScheduler()\n scheduler.add_job(check_list_len, 'cron', second='*/5')\n scheduler.start()\n\n@app.get("/pop")\nasync def list_pop():\n global test_list\n test_list.pop(1)\n logger.info(f"current_list_len:{len(test_list)}")\n\n\nif __name__ == '__main__':\n uvicorn.run(app="main3:app", host="0.0.0.0", port=80, reload=False, debug=False)\n
Run Code Online (Sandbox Code Playgroud)\n上面是我的代码,我想通过get请求取出一个列表元素,并设置一个周期性任务不断检查列表中的元素数量,但是当我运行时,总是出现以下错误:
\nExecution of job "check_list_len (trigger: cron[second='*/5'], next run at: 2021-11-25 09:48:50 CST)" skipped: maximum number of running instances reached (1)\n2021-11-25 09:48:50.016 | INFO | …
Run Code Online (Sandbox Code Playgroud) apscheduler ×10
python ×8
python-3.x ×2
fastapi ×1
pyinstaller ×1
pytz ×1
telegram ×1
timezone ×1
uvicorn ×1
windows ×1