Thi*_*mer 5 python queue process multiprocessing
我正在构建一个Python脚本/应用程序,它启动了多个所谓的Fetchers.他们反过来做某事并将数据返回队列.
我想确保Fetchers的运行时间不超过60秒(因为整个应用程序在一小时内运行多次).
阅读Python文档时,我注意到他们在使用Process.Terminate()时要小心,因为它可能会破坏Queue.
我目前的代码:
# Result Queue
resultQueue = Queue();
# Create Fetcher Instance
fetcher = fetcherClass()
# Create Fetcher Process List
fetcherProcesses = []
# Run Fetchers
for config in configList:
# Create Process to encapsulate Fetcher
log.debug("Creating Fetcher for Target: %s" % config['object_name'])
fetcherProcess = Process(target=fetcher.Run, args=(config,resultQueue))
log.debug("Starting Fetcher for Target: %s" % config['object_name'])
fetcherProcess.start()
fetcherProcesses.append((config, fetcherProcess))
# Wait for all Workers to complete
for config, fetcherProcess in fetcherProcesses:
log.debug("Waiting for Thread to complete (%s)." % str(config['object_name']))
fetcherProcess.join(DEFAULT_FETCHER_TIMEOUT)
if fetcherProcess.is_alive():
log.critical("Fetcher thread for object %s Timed Out! Terminating..." % config['object_name'])
fetcherProcess.terminate()
# Loop thru results, and save them in RRD
while not resultQueue.empty():
config, fetcherResult = resultQueue.get()
result = storage.Save(config, fetcherResult)
Run Code Online (Sandbox Code Playgroud)
我想确保当我的一个Fetchers超时时我的队列不会被破坏.
做这个的最好方式是什么?
编辑:回应与sebdelsol聊天时的一些说明:
1)我想尽快开始处理数据,因为否则我必须同时执行大量的磁盘密集型操作.因此,睡眠X_Timeout的主线程不是一个选项.
2)我需要等待Timeout只有一次,但每个进程,所以如果主线程启动50个取件器,这需要几秒钟到半分钟,我需要补偿.
3)我想确保来自Queue.Get()的数据被没有超时的Fetcher放在那里(因为理论上,当超时发生时,提取器将数据放入队列中是可能的,它被枪杀......)该数据应该被抛弃.
当发生超时时,这不是一件非常糟糕的事情,这不是一个理想的情况,但是腐败的数据更糟糕.
您可以将新内容传递给multiprocessing.Lock()您开始的每个抓取器.
在fetcher的进程中,一定要Queue.put()使用此锁包装:
with self.lock:
self.queue.put(result)
Run Code Online (Sandbox Code Playgroud)
当您需要终止fetcher的进程时,请使用其锁:
with fetcherLock:
fetcherProcess.terminate()
Run Code Online (Sandbox Code Playgroud)
这样,在队列访问期间杀死一个fetcher就不会破坏你的队列.
一些fetcher的锁可能会被破坏.但是,这不是问题,因为你推出的每个新推文都有一个全新的锁.
| 归档时间: |
|
| 查看次数: |
639 次 |
| 最近记录: |