hav*_*063 8 python watchdog multiprocessing python-multithreading
我正在使用Python的Watchdog监视给定目录中正在创建的新文件.创建文件时,会运行一些代码生成子进程shell命令以运行不同的代码来处理此文件.这应该为每个创建的新文件运行.我已经在创建一个文件时对此进行了测试,并且工作正常,但是在创建多个文件时,无论是同时还是一个接一个地创建它都很困难.
我当前的问题是这个...在shell中运行的处理代码需要一段时间才能运行,并且在目录中创建新文件之前不会完成.我无能为力.在此代码运行时,看门狗将无法识别已创建新文件,并且不会继续执行该代码.
所以我认为我需要为每个新文件生成一个新进程,或者做一些事情来同时运行,而不是等到一个文件完成后再处理下一个文件.
所以我的问题是:
1.)实际上,我将在一个目录中同时创建4个不同系列的文件.看门狗一次运行所有4个文件的文件创建代码的最佳方法是什么?
2.)当代码针对一个文件运行时,如何让监视程序开始处理同一系列中的下一个文件,而不必等到前一个文件的处理完成.这是必要的,因为文件是特定的,我需要暂停一个文件的处理,直到另一个文件完成,但它们的创建顺序可能会有所不同.
我是否需要以某种方式将看门狗与多处理或线程结合起来?或者我需要实现多个观察者?我有点不知所措.谢谢你的帮助.
class MonitorFiles(FileSystemEventHandler):
'''Sub-class of watchdog event handler'''
def __init__(self, config=None, log=None):
self.log = log
self.config = config
def on_created(self, event):
file = os.path.basename(event.src_path)
self.log.info('Created file {0}'.format(event.src_path))
dosWatch.go(event.src_path, self.config, self.log)
def on_modified(self, event):
file = os.path.basename(event.src_path)
ext = os.path.splitext(file)[1]
if ext == '.fits':
self.log.warning('Modifying a FITS file is not allowed')
return
def on_deleted(self, event):
self.log.critical('Nothing should ever be deleted from here!')
return
Run Code Online (Sandbox Code Playgroud)
def monitor(config, log):
'''Uses the Watchdog package to monitor the data directory for new files.
See the MonitorFiles class in dosClasses for actual monitoring code'''
event_handler = dosclass.MonitorFiles(config, log)
# add logging the the event handler
log_handler = LoggingEventHandler()
# set up observer
observer = Observer()
observer.schedule(event_handler, path=config.fitsDir, recursive=False)
observer.schedule(log_handler, config.fitsDir, recursive=False)
observer.start()
log.info('Begin MaNGA DOS!')
log.info('Start watching directory {0} for new files ...'.format(config.fitsDir))
# monitor
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.unschedule_all()
observer.stop()
log.info('Stop watching directory ...')
log.info('End MaNGA DOS!')
log.info('--------------------------')
log.info('')
observer.join()
Run Code Online (Sandbox Code Playgroud)
在上面,我的monitor方法设置watchdog来监视主目录.MonitorFiles类定义创建文件时发生的情况.它基本上调用这个dosWatch.go方法,该方法最终调用subprocess.Popen来运行shell命令.
这就是我最终所做的,它解决了我的问题。我使用多处理来启动单独的看门狗监视进程来分别监视每个文件。看门狗已经为我排队了新文件,这对我来说很好。
至于上面的第 2 点,我需要在 file1 之前处理 file2,即使 file1 是先创建的。因此,在 file1 期间,我检查 file2 处理的输出。如果找到它,它将继续处理 file1。如果没有,则退出。在处理 file2 时,我检查 file1 是否已创建,如果是,则处理 file1。(未显示此代码)
def monitorCam(camera, config, mainlog):
'''Uses the Watchdog package to monitor the data directory for new files.
See the MonitorFiles class in dosClasses for actual monitoring code. Monitors each camera.'''
mainlog.info('Process Name, PID: {0},{1}'.format(mp.current_process().name,mp.current_process().pid))
#init cam log
camlog = initLogger(config, filename='manga_dos_{0}'.format(camera))
camlog.info('Camera {0}, PID {1} '.format(camera,mp.current_process().pid))
config.camera=camera
event_handler = dosclass.MonitorFiles(config, camlog, mainlog)
# add logging the the event handler
log_handler = LoggingEventHandler()
# set up observer
observer = Observer()
observer.schedule(event_handler, path=config.fitsDir, recursive=False)
observer.schedule(log_handler, config.fitsDir, recursive=False)
observer.daemon=True
observer.start()
camlog.info('Begin MaNGA DOS!')
camlog.info('Start watching directory {0} for new files ...'.format(config.fitsDir))
camlog.info('Watching directory {0} for new files from camera {1}'.format(config.fitsDir,camera))
# monitor
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.unschedule_all()
observer.stop()
camlog.info('Stop watching directory ...')
camlog.info('End MaNGA DOS!')
camlog.info('--------------------------')
camlog.info('')
#observer.join()
if observer.is_alive():
camlog.info('still alive')
else:
camlog.info('thread ending')
Run Code Online (Sandbox Code Playgroud)
def startProcess(camera,config,log):
''' Uses multiprocessing module to start 4 different camera monitoring processes'''
jobs=[]
#pdb.set_trace()
#log.info(mp.log_to_stderr(logging.DEBUG))
for i in range(len(camera)):
log.info('Starting to monitor camera {0}'.format(camera[i]))
print 'Starting to monitor camera {0}'.format(camera[i])
try:
p = mp.Process(target=monitorCam, args=(camera[i],config, log), name=camera[i])
p.daemon=True
jobs.append(p)
p.start()
except KeyboardInterrupt:
log.info('Ending process: {0} for camera {1}'.format(mp.current_process().pid, camera[i]))
p.terminate()
log.info('Terminated: {0}, {1}'.format(p,p.is_alive()))
for i in range(len(jobs)):
jobs[i].join()
return
Run Code Online (Sandbox Code Playgroud)