使用 ProcessPoolExecuter 时如何获取子进程 PID?

Mii*_*ind 7 python apache-kafka

我正在使用 ProcessPoolExecutor 上下文管理器并行运行多个 Kafka 使用者。我需要存储子进程的进程 ID,以便稍后我可以干净地终止这些进程。我有这样的代码:

Class MultiProcessConsumer:
    ...

    def run_in_parallel(self):
        parallelism_factor = 5

        with ProcessPoolExecutor() as executor:
            processes = [executor.submit(self.consume) for _ in range(parallelism_factor)]
            # It would be nice If I could write [process.pid for process in processes] to a file here.

    def consume(self):
        while True:
            for message in self.kafka_consumer:
                do_stuff(message)
Run Code Online (Sandbox Code Playgroud)

os.get_pid()我知道我可以在 Consumer 方法中使用来获取 PID。但是,正确处理它们(在消费者不断关闭或启动的情况下)需要一些额外的工作。

您建议我如何在这样的上下文中获取并存储子进程的 PID?

Joe*_*Joe 5

os.getpid()似乎是要走的路。只需将它们通过队列或管道传递,并结合之前传递给进程的一些随机 UUID 来识别 PID。

from concurrent.futures import ProcessPoolExecutor
import os
import time 
import uuid
#from multiprocessing import Process, Queue
import multiprocessing
import queue
#The Empty exception in in Queue, multiprocessing borrows 
#it from there

# /sf/ask/693614701/     
m = multiprocessing.Manager()
q = m.Queue()

def task(n, queue, uuid):
    my_pid = os.getpid()
    print("Executing our Task on Process {}".format(my_pid))
    queue.put((uuid, my_pid))
    time.sleep(n)
    return n * n

def main():

    with ProcessPoolExecutor(max_workers = 3) as executor:

        some_dict = {}
        for i in range(10):
            print(i)

            u = uuid.uuid4()
            f = executor.submit(task, i, q, u)
            some_dict[u] = [f, None] # PID not known here

            try:
                rcv_uuid, rcv_pid = q.get(block=True, timeout=1)
                some_dict[rcv_uuid][1] = rcv_pid # store PID
            except queue.Empty as e:
                print('handle me', e)
            print('I am', rcv_uuid, 'and my PID is', rcv_pid)


if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)