Mii*_*ind 7 python apache-kafka
我正在使用 ProcessPoolExecutor 上下文管理器并行运行多个 Kafka 使用者。我需要存储子进程的进程 ID,以便稍后我可以干净地终止这些进程。我有这样的代码:
Class MultiProcessConsumer:
...
def run_in_parallel(self):
parallelism_factor = 5
with ProcessPoolExecutor() as executor:
processes = [executor.submit(self.consume) for _ in range(parallelism_factor)]
# It would be nice If I could write [process.pid for process in processes] to a file here.
def consume(self):
while True:
for message in self.kafka_consumer:
do_stuff(message)
Run Code Online (Sandbox Code Playgroud)
os.get_pid()
我知道我可以在 Consumer 方法中使用来获取 PID。但是,正确处理它们(在消费者不断关闭或启动的情况下)需要一些额外的工作。
您建议我如何在这样的上下文中获取并存储子进程的 PID?
os.getpid()
似乎是要走的路。只需将它们通过队列或管道传递,并结合之前传递给进程的一些随机 UUID 来识别 PID。
from concurrent.futures import ProcessPoolExecutor
import os
import time
import uuid
#from multiprocessing import Process, Queue
import multiprocessing
import queue
#The Empty exception in in Queue, multiprocessing borrows
#it from there
# /sf/ask/693614701/
m = multiprocessing.Manager()
q = m.Queue()
def task(n, queue, uuid):
my_pid = os.getpid()
print("Executing our Task on Process {}".format(my_pid))
queue.put((uuid, my_pid))
time.sleep(n)
return n * n
def main():
with ProcessPoolExecutor(max_workers = 3) as executor:
some_dict = {}
for i in range(10):
print(i)
u = uuid.uuid4()
f = executor.submit(task, i, q, u)
some_dict[u] = [f, None] # PID not known here
try:
rcv_uuid, rcv_pid = q.get(block=True, timeout=1)
some_dict[rcv_uuid][1] = rcv_pid # store PID
except queue.Empty as e:
print('handle me', e)
print('I am', rcv_uuid, 'and my PID is', rcv_pid)
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)