axe*_*nde 5 python multithreading network-programming multiprocessing
我正在尝试使用一组计算机来运行数百万个小型模拟。为此,我尝试在主计算机上设置两台“服务器”,一台用于将队列中的输入变量添加到网络,另一台用于处理结果。
这是将内容放入模拟变量队列的代码:
"""This script reads start parameters and calls on run_sim to run the
simulations"""
import time
from multiprocessing import Process, freeze_support, Manager, Value, Queue, current_process
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
class MultiComputers(Process):
def __init__(self, sim_name, queue):
self.sim_name = sim_name
self.queue = queue
super(MultiComputers, self).__init__()
def get_sim_obj(self, offset, db):
"""returns a list of lists from a database query"""
def handle_queue(self):
self.sim_nr = 0
sims = self.get_sim_obj()
self.total = len(sims)
while len(sims) > 0:
if self.queue.qsize() > 100:
self.queue.put(sims[0])
self.sim_nr += 1
print(self.sim_nr, round(self.sim_nr/self.total * 100, 2), self.queue.qsize())
del sims[0]
def run(self):
self.handle_queue()
if __name__ == '__main__':
freeze_support()
queue = Queue()
w = MultiComputers('seed_1_hundred', queue)
w.start()
QueueManager.register('get_queue', callable=lambda: queue)
m = QueueManager(address=('', 8001), authkey=b'abracadabra')
s = m.get_server()
s.serve_forever()
Run Code Online (Sandbox Code Playgroud)
然后运行此队列来处理模拟结果:
__author__ = 'axa'
from multiprocessing import Process, freeze_support, Queue
from multiprocessing.managers import BaseManager
import time
class QueueManager(BaseManager):
pass
class SaveFromMultiComp(Process):
def __init__(self, sim_name, queue):
self.sim_name = sim_name
self.queue = queue
super(SaveFromMultiComp, self).__init__()
def run(self):
res_got = 0
with open('sim_type1_' + self.sim_name, 'a') as f_1:
with open('sim_type2_' + self.sim_name, 'a') as f_2:
while True:
if self.queue.qsize() > 0:
while self.queue.qsize() > 0:
res = self.queue.get()
res_got += 1
if res[0] == 1:
f_1.write(str(res[1]) + '\n')
elif res[0] == 2:
f_2.write(str(res[1]) + '\n')
print(res_got)
time.sleep(0.5)
if __name__ == '__main__':
queue = Queue()
w = SaveFromMultiComp('seed_1_hundred', queue)
w.start()
m = QueueManager(address=('', 8002), authkey=b'abracadabra')
s = m.get_server()
s.serve_forever()
Run Code Online (Sandbox Code Playgroud)
这些脚本按预期处理前 ~7-800 个模拟,之后我在运行接收结果脚本的终端中收到以下错误:
Exception in thread Thread-1:
Traceback (most recent call last):
File "C:\Python35\lib\threading.py", line 914, in _bootstrap_inner
self.run()
File "C:\Python35\lib\threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "C:\Python35\lib\multiprocessing\managers.py", line 177, in accepter
t.start()
File "C:\Python35\lib\threading.py", line 844, in start
_start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread
Run Code Online (Sandbox Code Playgroud)
任何人都可以提供一些关于线程在何处以及如何生成的见解,每次我调用时都会生成一个新线程queue.get()或者它是如何工作的?如果有人知道我可以做什么来避免这种失败,我会非常高兴?(我正在使用Python3.5-32运行脚本)
所有迹象都表明您的系统缺少启动线程所需的资源(可能是内存,但您可能会泄漏线程或其他资源)。您可以使用操作系统监视工具(top适用于 Linux、Resource Monitor适用于 Windows)来查看线程数和内存使用情况来跟踪这一点,但我建议您使用更简单、更高效的编程模式。
虽然不是完美的比较,但您通常会看到C10K 问题,它指出等待结果的阻塞线程不能很好地扩展,并且可能容易出现这样的泄漏错误。解决方案是实现异步 IO 模式(一个阻塞线程启动其他工作线程),这在 Web 服务器中非常简单。
像 pythons 这样的框架aiohttp应该很适合你想要的。您只需要一个可以获取远程代码的 ID 和结果的处理程序。希望该框架能够为您处理扩展问题。
因此,在您的情况下,您可以保留启动代码,但在远程计算机上启动进程后,终止该线程。然后让远程代码向您的服务器发送一条 HTTP 消息,其中包含 1) 其 ID 和 2) 其结果。如果没有收到 200“OK”状态代码,请添加一些额外的代码,要求它重试,这样您的状态应该会好得多。