Vor*_*Vor 20 python linux multithreading
我对multiprocessing模块很新.我只是尝试创建以下内容:我有一个进程,其工作是从RabbitMQ获取消息并将其传递给内部队列(multiprocessing.Queue).然后我想要做的是:在新消息进入时生成一个进程.它可以工作,但是在作业完成后,它会让僵尸进程不被它的父进程终止.这是我的代码:
主要流程:
#!/usr/bin/env python
import multiprocessing
import logging
import consumer
import producer
import worker
import time
import base
conf = base.get_settings()
logger = base.logger(identity='launcher')
request_order_q = multiprocessing.Queue()
result_order_q = multiprocessing.Queue()
request_status_q = multiprocessing.Queue()
result_status_q = multiprocessing.Queue()
CONSUMER_KEYS = [{'queue':'product.order',
'routing_key':'product.order',
'internal_q':request_order_q}]
# {'queue':'product.status',
# 'routing_key':'product.status',
# 'internal_q':request_status_q}]
def main():
# Launch consumers
for key in CONSUMER_KEYS:
cons = consumer.RabbitConsumer(rabbit_q=key['queue'],
routing_key=key['routing_key'],
internal_q=key['internal_q'])
cons.start()
# Check reques_order_q if not empty spaw a process and process message
while True:
time.sleep(0.5)
if not request_order_q.empty():
handler = worker.Worker(request_order_q.get())
logger.info('Launching Worker')
handler.start()
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
这是我的工人:
import multiprocessing
import sys
import time
import base
conf = base.get_settings()
logger = base.logger(identity='worker')
class Worker(multiprocessing.Process):
def __init__(self, msg):
super(Worker, self).__init__()
self.msg = msg
self.daemon = True
def run(self):
logger.info('%s' % self.msg)
time.sleep(10)
sys.exit(1)
Run Code Online (Sandbox Code Playgroud)
所以在处理完所有消息后,我可以看到带ps aux命令的进程.但我真的希望他们一旦完成就被终止.谢谢.
Mar*_* K. 12
有几件事:
确保父母joins的子女,以避免僵尸.请参阅Python多处理程序杀死进程
您可以检查子项是否仍在使用is_alive()成员函数运行.请参见http://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process
使用multiprocessing.active_children比...更好Process.join.该函数active_children清除自上次调用以来创建的任何僵尸active_children.该方法join等待所选过程.在此期间,其他进程可以终止并变为僵尸,但在等待等待的方法加入之前,父进程将不会注意到.要看到这个:
import multiprocessing as mp
import time
def main():
n = 3
c = list()
for i in xrange(n):
d = dict(i=i)
p = mp.Process(target=count, kwargs=d)
p.start()
c.append(p)
for p in reversed(c):
p.join()
print('joined')
def count(i):
print('{i} going to sleep'.format(i=i))
time.sleep(i * 10)
print('{i} woke up'.format(i=i))
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
以上将创建3个进程,每个进程间隔10秒.代码是,最后一个进程首先连接,所以其他两个,先前终止,将是20秒的僵尸.你可以看到它们:
ps aux | grep Z
Run Code Online (Sandbox Code Playgroud)
如果等待它们将终止它们的过程中将没有僵尸.删除reversed以查看此案例.但是,在实际应用中,我们很少知道子节点将终止的顺序,因此使用join会导致一些僵尸.
替代方案active_children不会留下任何僵尸.在上面的示例中,将循环替换为for p in reversed(c)::
while True:
time.sleep(1)
if not mp.active_children():
break
Run Code Online (Sandbox Code Playgroud)
看看会发生什么.