Python无法启动新线程多处理

axe*_*nde 5 python multithreading network-programming multiprocessing

我正在尝试使用一组计算机来运行数百万个小型模拟。为此,我尝试在主计算机上设置两台“服务器”,一台用于将队列中的输入变量添加到网络,另一台用于处理结果。

这是将内容放入模拟变量队列的代码:

"""This script reads start parameters and calls on run_sim to run the
simulations"""
import time
from multiprocessing import Process, freeze_support, Manager, Value, Queue, current_process
from multiprocessing.managers import BaseManager


class QueueManager(BaseManager):
    pass


class MultiComputers(Process):
    def __init__(self, sim_name, queue):
        self.sim_name = sim_name
        self.queue = queue
        super(MultiComputers, self).__init__()

    def get_sim_obj(self, offset, db):
        """returns a list of lists from a database query"""

    def handle_queue(self):
        self.sim_nr = 0
        sims = self.get_sim_obj()
        self.total = len(sims)
        while len(sims) > 0:
            if self.queue.qsize() > 100:
                self.queue.put(sims[0])
                self.sim_nr += 1
                print(self.sim_nr, round(self.sim_nr/self.total * 100, 2), self.queue.qsize())
                del sims[0]

    def run(self):
        self.handle_queue()

if __name__ == '__main__':
    freeze_support()
    queue = Queue()
    w = MultiComputers('seed_1_hundred', queue)
    w.start()
    QueueManager.register('get_queue', callable=lambda: queue)
    m = QueueManager(address=('', 8001), authkey=b'abracadabra')
    s = m.get_server()
    s.serve_forever()
Run Code Online (Sandbox Code Playgroud)

然后运行此队列来处理模拟结果:

__author__ = 'axa'
from multiprocessing import Process, freeze_support, Queue
from multiprocessing.managers import BaseManager
import time


class QueueManager(BaseManager):
    pass


class SaveFromMultiComp(Process):
    def __init__(self, sim_name, queue):
        self.sim_name = sim_name
        self.queue = queue
        super(SaveFromMultiComp, self).__init__()

    def run(self):
        res_got = 0
        with open('sim_type1_' + self.sim_name, 'a') as f_1:
            with open('sim_type2_' + self.sim_name, 'a') as f_2:
                while True:
                    if self.queue.qsize() > 0:
                        while self.queue.qsize() > 0:
                            res = self.queue.get()
                            res_got += 1
                            if res[0] == 1:
                                f_1.write(str(res[1]) + '\n')
                            elif res[0] == 2:
                                f_2.write(str(res[1]) + '\n')
                            print(res_got)
                    time.sleep(0.5)


if __name__ == '__main__':
    queue = Queue()
    w = SaveFromMultiComp('seed_1_hundred', queue)
    w.start()
    m = QueueManager(address=('', 8002), authkey=b'abracadabra')
    s = m.get_server()
    s.serve_forever()
Run Code Online (Sandbox Code Playgroud)

这些脚本按预期处理前 ~7-800 个模拟,之后我在运行接收结果脚本的终端中收到以下错误:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "C:\Python35\lib\threading.py", line 914, in _bootstrap_inner
    self.run()
  File "C:\Python35\lib\threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Python35\lib\multiprocessing\managers.py", line 177, in accepter
    t.start()
  File "C:\Python35\lib\threading.py", line 844, in start
    _start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread
Run Code Online (Sandbox Code Playgroud)

任何人都可以提供一些关于线程在何处以及如何生成的见解,每次我调用时都会生成一个新线程queue.get()或者它是如何工作的?如果有人知道我可以做什么来避免这种失败,我会非常高兴?(我正在使用Python3.5-32运行脚本)

Lia*_*lly 4

所有迹象都表明您的系统缺少启动线程所需的资源(可能是内存,但您可能会泄漏线程或其他资源)。您可以使用操作系统监视工具(top适用于 Linux、Resource Monitor适用于 Windows)来查看线程数和内存使用情况来跟踪这一点,但我建议您使用更简单、更高效的编程模式。

虽然不是完美的比较,但您通常会看到C10K 问题,它指出等待结果的阻塞线程不能很好地扩展,并且可能容易出现这样的泄漏错误。解决方案是实现异步 IO 模式(一个阻塞线程启动其他工作线程),这在 Web 服务器中非常简单。

像 pythons 这样的框架aiohttp应该很适合你想要的。您只需要一个可以获取远程代码的 ID 和结果的处理程序。希望该框架能够为您处理扩展问题。

因此,在您的情况下,您可以保留启动代码,但在远程计算机上启动进程后,终止该线程。然后让远程代码向您的服务器发送一条 HTTP 消息,其中包含 1) 其 ID 和 2) 其结果。如果没有收到 200“OK”状态代码,请添加一些额外的代码,要求它重试,这样您的状态应该会好得多。