标签: python-watchdog

在Python中从同一个类中调用另一个方法

我是python的新手.我试图将值从一个方法传递到类中的另一个方法.我搜索了这个问题,但我找不到合适的解决方案.因为在我的代码中,"if"正在调用类的方法"on_any_event",反过来应该调用另一个方法"dropbox_fn",它使用"on_any_event"中的值.如果"dropbox_fn"方法在类之外,它会工作吗?

我将用代码说明.

class MyHandler(FileSystemEventHandler):
 def on_any_event(self, event):
    srcpath=event.src_path
    print (srcpath, 'has been ',event.event_type)
    print (datetime.datetime.now())
    #print srcpath.split(' ', 12 );
    filename=srcpath[12:]
    return filename # I tried to call the method. showed error like not callable 

 def dropbox_fn(self)# Or will it work if this methos is outside the class ?
    #this method uses "filename"

if __name__ == "__main__":
  path = sys.argv[1] if len(sys.argv) > 1 else '.'
  print ("entry")
  event_handler = MyHandler()
  observer = Observer()
  observer.schedule(event_handler, path, recursive=True)
  observer.start()
  try:
     while …
Run Code Online (Sandbox Code Playgroud)

python methods class event-handling python-watchdog

22
推荐指数
2
解决办法
7万
查看次数

“无法从‘watchdog.events’导入名称‘EVENT_TYPE_OPENED’”

我正在尝试制作 REST api (初学者),但是当我尝试从此代码初始化服务器时:

from flask import Flask

app = Flask(__name__)

if __name__=='__main__':
    app.run(debug=True, port=4000)
Run Code Online (Sandbox Code Playgroud)

我在提示中收到此错误:

 from watchdog.events import EVENT_TYPE_OPENED
ImportError: cannot import name 'EVENT_TYPE_OPENED' from 'watchdog.events' 
(C:\ ********* \Python\Python310\lib\site-packages\watchdog\events.py)
Run Code Online (Sandbox Code Playgroud)

我期待这样的事情(分钟 8:27): https://www.youtube.com/watch?v=GMppyAPbLYk &ab_channel=TechWithTim

python flask python-watchdog

17
推荐指数
1
解决办法
2万
查看次数

python 脚本无限期运行时如何刷新 boto3 凭据

我正在尝试编写一个 python 脚本,该脚本使用 watchdog 来查找文件创建并使用 boto3 将其上传到 s3。但是,我的 boto3 凭据每 12 小时就会过期,因此我需要更新它们。我将我的 boto3 凭据存储在~/.aws/credentials. 所以现在我正在尝试捕获S3UploadFailedError、更新凭据并将它们写入~/.aws/credentials. 但是,尽管凭证正在更新,但我boto3.client('s3')再次调用它的抛出异常。

我究竟做错了什么?或者我该如何解决?

下面是代码片段

try:
     s3 = boto3.client('s3')
     s3.upload_file(event.src_path,'bucket-name',event.src_path)

except boto3.exceptions.S3UploadFailedError as e:
     print(e)
     get_aws_credentials()
     s3 = boto3.client('s3')

Run Code Online (Sandbox Code Playgroud)

python-3.x python-watchdog boto3

15
推荐指数
2
解决办法
2万
查看次数

Python看门狗窗口等到复制完成

我在Windows 2012服务器上使用Python监视程序模块来监视共享驱动器上出现的新文件.当看门狗注意到新文件时,它将启动数据库恢复过程.

但是,监视程序似乎会尝试在创建的第二个文件中恢复文件,而不是等到文件完成复制到共享驱动器.所以我将事件更改为on_modified但是有两个on_modified事件,一个是文件最初被复制时的事件,另一个是完成复制后的事件.

如何将两个on_modified事件处理为仅在复制到共享驱动器的文件完成时触发?

将多个文件同时复制到共享驱动器时会发生什么?

这是我的代码

import time
import subprocess
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class NewFile(FileSystemEventHandler):
    def process(self, event):
        if event.is_directory:
            return

    if event.event_type == 'modified':            
        if getext(event.src_path) == 'gz':
            load_pgdump(event.src_path)

    def on_modified(self, event):
        self.process(event)

def getext(filename):
    "Get the file extension"
    file_ext = filename.split(".",1)[1]
    return file_ext

def load_pgdump(src_path):    
    restore = 'pg_restore command ' + src_path
    subprocess.call(restore, shell=True)

def main():
    event_handler = NewFile()
    observer = Observer()
    observer.schedule(event_handler, path='Y:\\', recursive=True)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop() …
Run Code Online (Sandbox Code Playgroud)

python python-watchdog

9
推荐指数
2
解决办法
5042
查看次数

看门狗在Python 3中获得三次事件

我正在使用Watchdog在Python中创建一个程序,该程序监视一组文件并根据更改采取操作.我把他们网站上的确切示例放在一个文件中:

import sys
import time
import logging
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s - %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S')
    path = sys.argv[1] if len(sys.argv) > 1 else '.'
    event_handler = LoggingEventHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
Run Code Online (Sandbox Code Playgroud)

然后,我注意到一些奇怪的事情 我以相同的方式为Python 2和Python 3安装了看门狗(使用pip2 install watchdogpip3 install watchdog).然而,当我运行在Python 2和3的程序,做同样的修改一次每个,出现这种情况:

$ python2 watch_test.py
2015-09-30 11:18:32 - Modified …
Run Code Online (Sandbox Code Playgroud)

python python-2.x python-3.x python-watchdog

8
推荐指数
0
解决办法
812
查看次数

PyQT线程最简单的方法

我在PyQt中有一个带有函数的GUI addImage(image_path).很容易想象,当应该将新图像添加到QListWidget中时调用它.为了检测文件夹中的新图像,我使用threading.Threadwith watchdog来检测文件夹中的文件更改,然后该线程addImage直接调用.

QPixmap由于线程安全的原因,这会产生不应在gui线程外部调用的警告.

使线程安全的最佳和最简单的方法是什么?QThread的?信号/插槽?QMetaObject.invokeMethod?我只需要从线程传递一个字符串addImage.

python qt pyqt thread-safety python-watchdog

8
推荐指数
2
解决办法
2万
查看次数

如何使用Python Watchdog在目录中发生任何变化时运行函数?

我试图使用看门狗在dir中发生任何变化时运行同步脚本(除了一个特定文件).我只是复制了自述文件中的代码(粘贴在下面),它完成了它所说的内容; 记录哪个文件已更改.

import sys
import time
import logging
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s - %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S')
    path = sys.argv[1] if len(sys.argv) > 1 else '.'
    event_handler = LoggingEventHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
Run Code Online (Sandbox Code Playgroud)

我现在想要在任何变化时运行一个函数(它将整个文件夹同步到远程机器).所以我只event_handler用我自己的功能代替了.但是这给了我以下错误:

Traceback (most recent call last):
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/Library/Python/2.7/site-packages/watchdog/observers/api.py", line 199, in run
    self.dispatch_events(self.event_queue, self.timeout)
  File "/Library/Python/2.7/site-packages/watchdog/observers/api.py", …
Run Code Online (Sandbox Code Playgroud)

python filesystems filesystemwatcher watchdog python-watchdog

7
推荐指数
2
解决办法
4443
查看次数

如何查看文件,而不是使用Python进行更改的目录?

问题:如何使用Python查看文件以进行更改?建议使用看门狗,但我发现它只能观看目录,而不是文件.watchdog-test.py是看门狗的示例脚本:

$ python watchdog-test.py ab_test_res.sh &
[1] 30628
fbt@fbt64:~/laike9m$ Traceback (most recent call last):
  File "watchdog-test.py", line 15, in <module>
    observer.start()
  File "/usr/local/lib/python2.7/dist-packages/watchdog/observers/api.py", line 255, in start
    emitter.start()
  File "/usr/local/lib/python2.7/dist-packages/watchdog/utils/__init__.py", line 111, in start
    self.on_thread_start()
  File "/usr/local/lib/python2.7/dist-packages/watchdog/observers/inotify.py", line 121, in on_thread_start
    self._inotify = InotifyBuffer(path, self.watch.is_recursive)
  File "/usr/local/lib/python2.7/dist-packages/watchdog/observers/inotify_buffer.py", line 35, in __init__
    self._inotify = Inotify(path, recursive)
  File "/usr/local/lib/python2.7/dist-packages/watchdog/observers/inotify_c.py", line 187, in __init__
    self._add_dir_watch(path, recursive, event_mask)
  File "/usr/local/lib/python2.7/dist-packages/watchdog/observers/inotify_c.py", line 363, in _add_dir_watch
    raise OSError('Path is not a directory') …
Run Code Online (Sandbox Code Playgroud)

python file watch python-watchdog

6
推荐指数
1
解决办法
2654
查看次数

看门狗观察者未在容器中运行

我正在使用看门狗来监视文件目录中的文件系统事件。如果此观察程序脚本检测到事件,我想在数据库(在本例中为 Mongodb)中创建一个条目。要监视的文件系统实际上是一个 Docker 卷,将其文件系统链接到我的主机电脑。所有代码都在 docker 容器中运行。我可以附加到系统中的任何容器并使用 pymongo 正确地将条目添加到数据库中。

我还可以在主机上运行观察程序,并且所有操作都按预期工作(包括在容器中的链接文件系统对文件系统进行更改时按预期操作。

但是,当我在容器中运行观察程序代码时,文件系统事件触发的事件处理程序中的方法似乎永远不会被调用。当然没有创建数据库条目。

代码来源有3个:

  1. 看门狗目录“watcher”包括一个观察者和一个处理程序[watchdog_classes]

    import time
    from watchdog.observers import Observer
    import watchdog.events as events
    from data_persist import persistance_interface
    
    db_interface= persistance_interface()
    
    class RepoWatcher:
    
    
    
       def __init__(self, dir_root='/targer_dir/'):
            print(dir_root)
            self.observer = Observer()
            self.dir_root = dir_root
    
        def run(self):
            event_handler = CustomEventHandler()
            self.observer.schedule(event_handler, self.dir_root, recursive=True)
            self.observer.start()
            try:
                while True:
                    time.sleep(5)
            except KeyboardInterrupt:
                self.observer.stop()
                print("Shutting down...")
    
            self.observer.join()
    
    class CustomEventHandler(events.FileSystemEventHandler):
        @staticmethod
        def on_any_event(event):
    
            # Renamed files or dirs
            if isinstance(event, events.FileSystemMovedEvent):
                print("moved")
                db_interface.persist_one({'rename': …
    Run Code Online (Sandbox Code Playgroud)

python-3.x docker python-watchdog

6
推荐指数
1
解决办法
4443
查看次数

Python Watchdog 在启动时处理现有文件

我有一个简单的 Watchdog 和 Queue 进程来监视目录中的文件。代码取自https://camcairns.github.io/python/2017/09/06/python_watchdog_jobs_queue.html

import time
from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer
from queue import Queue
from threading import Thread

dir_path = "/data"

def process_queue(q):

    while True:
        if not q.empty():
            event = q.get()
            print("New event %s" % event)

        time.sleep(5)


class FileWatchdog(PatternMatchingEventHandler):

    def __init__(self, queue, patterns):
        PatternMatchingEventHandler.__init__(self, patterns=patterns)
        self.queue = queue

    def process(self, event):
        self.queue.put(event)

    def on_created(self, event):
        self.process(event)


if __name__ == '__main__':

    watchdog_queue = Queue()

    worker = Thread(target=process_queue, args=(watchdog_queue,))
    worker.setDaemon(True)
    worker.start()

    event_handler = FileWatchdog(watchdog_queue, patterns="*.ini")
    observer …
Run Code Online (Sandbox Code Playgroud)

python queue watchdog python-watchdog

5
推荐指数
1
解决办法
778
查看次数