Sun*_* Ha 4 celery flask python-watchdog
我正在寻找带有看门狗观察器的基于烧瓶的 Web 应用程序的示例。更具体地说,我想使用看门狗观察器来检测预定义目录中的任何更改,并根据更改更新 Web 应用程序。我可以为它们中的每一个找到许多示例,即基于烧瓶的 Web 应用程序和看门狗观察者示例。
但是,我不知道如何集成两个示例并顺利运行它们。谁能提供一个简单的例子?
另外,我想知道我是否可以用 Celery 工人运行看门狗观察者?
谢谢
编辑:我使用芹菜工人来运行看门狗观察者来观察目录及其子目录,如下所示:
@celery.task(bind=True)
def _watcher(self):
observer = Observer()
handler = MyHandler()
observer.schedule(handler, '.')
observer.start()
try:
while True:
if not handler.event_q.empty():
event, ts = handler.event_q.get()
self.update_state(state='PROGRESS', meta={'src_path': event.src_path, 'event_type': event.event_type})
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
return {'src_path': 'srcpath', 'event_type': 'eventtype'}
Run Code Online (Sandbox Code Playgroud)
然后,从前端,每隔 1 秒,它调用 GET 函数来更新任何更改(如果有)。这有点hacky。
我最终想要实现的是 1) 继续观察目录及其子目录,2) 如果有任何更改,则根据更改更新数据库以及 3) 根据更改更新前端。
到目前为止,我可以使用看门狗(上面代码中的 MyHandler 类)根据文件系统中的更改更新数据库。但是,我仍然在寻找更好的解决方案来观察 Flask 框架内的变化并更新前端的变化。
当启用调试模式时,Flask (Werkzeug) 已经为开发服务器集成了看门狗。您唯一需要做的就是安装看门狗:
$ pip install watchdog
Run Code Online (Sandbox Code Playgroud)
更多信息:
我最终使用了多线程。
from flask import Flask, render_template, jsonify
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from queue import Queue
import time
import threading
class MyHandler(FileSystemEventHandler):
def __init__(self, pattern=None):
self.pattern = pattern or (".xml", ".tiff", ".jpg")
self.event_q = Queue()
self.dummyThread = None
def on_any_event(self, event):
if not event.is_directory and event.src_path.endswith(self.pattern):
self.event_q.put((event, time.time()))
def start(self):
self.dummyThread = threading.Thread(target=self._process)
self.dummyThread.daemon = True
self.dummyThread.start()
def _process(self):
while True:
time.sleep(1)
app = Flask(__name__)
handler = MyHandler()
handler.start()
eventlist_flag = 0
eventlist = []
def run_watcher():
global eventlist_flag, eventlist
observer = Observer()
observer.schedule(handler, '.')
observer.start()
try:
while True:
if eventlist_flag == 0:
eventlist_flag = 1
while not handler.event_q.empty():
event, ts = handler.event_q.get()
eventlist.append(event)
eventlist_flag = 0
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
@app.route('/watcher/status', methods=['POST'])
def watchernow():
global eventlist_flag, eventlist
if eventlist_flag == 0 and len(eventlist) > 0:
eventlist_flag = 2
for e in eventlist:
print(e)
eventlist = []
eventlist_flag = 0
return jsonify({})
@app.route('/')
def index():
return render_template('index.html')
if __name__ == '__main__':
watcher_thread = threading.Thread(target=run_watcher)
watcher_thread.start()
app.run(debug=True)
watcher_thread.join()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4449 次 |
| 最近记录: |