看门狗和matplotlib:当目录中有新文件时,处理图像并显示结果

use*_*540 13 python file matplotlib watchdog

我正在尝试创建一个简单的应用程序,其中

  1. 将图像推送到目录中(通过外部进程)

  2. Python看门狗触发器,图像由函数处理,结果显示在窗口中

作业连续运行,并且在图像进入目录时触发处理功能。结果的绘图窗口应仅用新结果更新,而不是关闭窗口然后重新绘图。

下面的代码不显示结果。绘图窗口保持空白,然后崩溃。如果除matplotlib之外的其他工具可以轻松完成此工作,那也很好。

# plt is matplotlib.pyplot

def process_and_plot(test_file):
    y, x = getresults(test_file) # function which returns results on image file

    y_pos = range(len(y))
    plt.figure(num=1,figsize=(20,10))
    plt.bar(y_pos, y, align='center')
    plt.xticks(y_pos, x)
    plt.show()

# to trigger the proess_and_plt function when a new file comes in directory 

class ExampleHandler(FileSystemEventHandler):
    def on_created(self, event): 
        print event.src_path
        process_and_plot(event.src_path)

event_handler = ExampleHandler() 
observer.schedule(event_handler, path='path/to/directory')
observer.start()
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    observer.stop()

observer.join()

Run Code Online (Sandbox Code Playgroud)

Ulr*_*ern 2

为了让您的代码正常工作,我唯一需要做的就是替换plt.show()plt.pause(.001),它是非阻塞的,并在暂停之前更新并显示图形(请参阅文档)。

SO 上最好的相关答案似乎是这样的。有一些使用plt.show(False)plt.ion()使其成为plt.show()非阻塞的建议;对于 Matplotlib 2.2.4 来说,两者都不起作用。

这是完整的代码,因为问题中的代码遗漏了几行:

import matplotlib.pyplot as plt, time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

def process_and_plot(test_file):
    #y, x = getresults(test_file) # function which returns results on image file
    y, x = [2, 4, 3], [0, 1, 2]

    y_pos = range(len(y))
    plt.figure(num=1,figsize=(20,10))
    plt.title(test_file)
    plt.bar(y_pos, y, align='center')
    plt.xticks(y_pos, x)
    plt.pause(.001)

# to trigger the proess_and_plt function when a new file comes in directory 

class ExampleHandler(FileSystemEventHandler):
    def on_created(self, event):
        print event.src_path
        process_and_plot(event.src_path)

event_handler = ExampleHandler()
observer = Observer()
observer.schedule(event_handler, path='/path/to/directory')
observer.start()
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    observer.stop()

observer.join()
Run Code Online (Sandbox Code Playgroud)