Python:如何将外部队列与 ProcessPoolExecutor 一起使用?

ris*_*ado 5 python asynchronous multiprocessing

我最近开始使用 Python 的多线程和多处理功能。

我尝试编写代码,使用生产者/消费者方法从 JSON 日志文件中读取块,将这些块作为事件写入队列,然后启动一组进程,这些进程将从该队列(文件块)中轮询事件并处理每个其中之一,打印出结果。

我的意图是首先启动进程,让它们等待事件开始进入队列。

我目前正在使用这段代码,它似乎可以工作,使用我发现的示例中的一些点点滴滴:

import re, sys
from multiprocessing import Process, Queue

def process(file, chunk):
    f = open(file, "rb")
    f.seek(chunk[0])
    for entry in pat.findall(f.read(chunk[1])):
        print(entry)

def getchunks(file, size=1024*1024):
    f = open(file, "rb")
    while True:
        start = f.tell()
        f.seek(size, 1)
        s = f.readline() # skip forward to next line ending
        yield start, f.tell() - start
        if not s:
            break

def processingChunks(queue):
    while True:
        queueEvent = queue.get()
        if (queueEvent == None):
            queue.put(None)
            break
        process(queueEvent[0], queueEvent[1])

if __name__ == "__main__":
    testFile = "testFile.json"
    pat = re.compile(r".*?\n")
    queue = Queue()

    for w in xrange(6):
        p = Process(target=processingChunks, args=(queue,))
        p.start()

    for chunk in getchunks(testFile):
        queue.put((testFile, chunk))
        print(queue.qsize())
    queue.put(None)
Run Code Online (Sandbox Code Playgroud)

但是,我想学习如何使用 concurrent.futures ProcessPoolExecutor 以异步方式使用 Future 结果对象实现相同的结果。

我的第一次尝试暗示使用一个由多处理管理器创建的外部队列,我会将其传递给进程进行轮询。

然而,这似乎不起作用,我认为这可能不是 ProcessPoolExecutor 设计的工作方式,因为它似乎使用了它自己的内部队列。

我使用了这个代码:

import concurrent
from concurrent.futures import as_completed
import re, sys
from multiprocessing import Lock, Process, Queue, current_process, Pool, Manager

def process(file, chunk):
    entries = []
    f = open(file, "rb")
    f.seek(chunk[0])
    for entry in pat.findall(f.read(chunk[1])):
        entries.append(entry)
        return entries

def getchunks(file, size=1024*1024):
    f = open(file, "rb")
    while True:
        start = f.tell()
        f.seek(size, 1)
        s = f.readline() # skip forward to next line ending
        yield start, f.tell() - start
        if not s:
            break

def processingChunks(queue):
    while True:
        queueEvent = queue.get()
        if (queueEvent == None):
            queue.put(None)
            break
        return process(queueEvent[0], queueEvent[1])

if __name__ == "__main__":
    testFile = "testFile.json"
    pat = re.compile(r".*?\n")
    procManager = Manager()
    queue = procManager.Queue()

    with concurrent.futures.ProcessPoolExecutor(max_workers = 6) as executor:
        futureResults = []
        for i in range(6):
            future_result = executor.submit(processingChunks, queue)
            futureResults.append(future_result)

        for complete in as_completed(futureResults):
            res = complete.result()
            for i in res:
                print(i)


    for chunk in getchunks(testFile):
        queue.put((testFile, chunk))
        print(queue.qsize())
    queue.put(None)
Run Code Online (Sandbox Code Playgroud)

我无法获得任何结果,所以很明显我做错了,而且我不理解这个概念。

你们能帮我理解我如何实现这个吗?

Blc*_*ght 1

如果您使用的是ProcessPoolExecutor,则根本不需要您的processingChunks函数,也不需要从 导入的任何内容multiprocessing。该池基本上自动执行您的函数之前所做的事情。相反,使用类似这样的方法来排队并一次性分派所有工作:

with concurrent.futures.ProcessPoolExecutor(max_workers = 6) as executor:
    executor.map(process, itertools.repeat(testFile), getchunks(testFile))
Run Code Online (Sandbox Code Playgroud)

我不确定您的原始代码如何在pat不成为参数的情况下工作process(我预计每个工作进程都会因NameError异常而失败)。如果这是一个真正的问题(而不仅仅是示例代码的产物),您可能需要进行更多修改,以将其与 和 一起传递到工作进程filechunk可能itertools.repeat会派上用场)。