使用Process.Terminate()时如何解决队列损坏

Thi*_*mer 5 python queue process multiprocessing

我正在构建一个Python脚本/应用程序,它启动了多个所谓的Fetchers.他们反过来做某事并将数据返回队列.

我想确保Fetchers的运行时间不超过60秒(因为整个应用程序在一小时内运行多次).

阅读Python文档时,我注意到他们在使用Process.Terminate()时要小心,因为它可能会破坏Queue.

我目前的代码:

# Result Queue
resultQueue = Queue();

# Create Fetcher Instance
fetcher = fetcherClass()

# Create Fetcher Process List
fetcherProcesses = []

# Run Fetchers
for config in configList:
    # Create Process to encapsulate Fetcher
    log.debug("Creating Fetcher for Target: %s" % config['object_name'])
    fetcherProcess = Process(target=fetcher.Run, args=(config,resultQueue))

    log.debug("Starting Fetcher for Target: %s" % config['object_name'])
    fetcherProcess.start()
    fetcherProcesses.append((config, fetcherProcess))

# Wait for all Workers to complete
for config, fetcherProcess in fetcherProcesses:
    log.debug("Waiting for Thread to complete (%s)." % str(config['object_name']))
    fetcherProcess.join(DEFAULT_FETCHER_TIMEOUT)
    if fetcherProcess.is_alive():
        log.critical("Fetcher thread for object %s Timed Out! Terminating..." % config['object_name'])
        fetcherProcess.terminate()

# Loop thru results, and save them in RRD
while not resultQueue.empty():
    config, fetcherResult = resultQueue.get()
    result = storage.Save(config, fetcherResult)
Run Code Online (Sandbox Code Playgroud)

我想确保当我的一个Fetchers超时时我的队列不会被破坏.

做这个的最好方式是什么?

编辑:回应与sebdelsol聊天时的一些说明:

1)我想尽快开始处理数据,因为否则我必须同时执行大量的磁盘密集型操作.因此,睡眠X_Timeout的主线程不是一个选项.

2)我需要等待Timeout只有一次,但每个进程,所以如果主线程启动50个取件器,这需要几秒钟到半分钟,我需要补偿.

3)我想确保来自Queue.Get()的数据被没有超时的Fetcher放在那里(因为理论上,当超时发生时,提取器将数据放入队列中是可能的,它被枪杀......)该数据应该被抛弃.

当发生超时时,这不是一件非常糟糕的事情,这不是一个理想的情况,但是腐败的数据更糟糕.

seb*_*sol 6

您可以将新内容传递给multiprocessing.Lock()您开始的每个抓取器.

在fetcher的进程中,一定要Queue.put()使用此锁包装:

with self.lock:
    self.queue.put(result)
Run Code Online (Sandbox Code Playgroud)

当您需要终止fetcher的进程时,请使用其锁:

with fetcherLock:
    fetcherProcess.terminate()
Run Code Online (Sandbox Code Playgroud)

这样,在队列访问期间杀死一个fetcher就不会破坏你的队列.

一些fetcher的锁可能会被破坏.但是,这不是问题,因为你推出的每个新推文都有一个全新的锁.