Ste*_*ano 7 python multithreading asynchronous threadpool node.js
我需要运行int f(int i)带有10_000参数的函数,由于I / O时间的原因,该过程大约需要1秒钟才能执行。
在Python之类的语言中,我可以使用线程(或者async/await,我知道,但稍后再讨论)来并行化此任务。
如果我希望始终有10个正在运行的线程,并在它们之间分配任务,则可以使用ThreadingPool:
def f(p):
    x = [...]
    return x
p = ThreadPool()
xs = p.map(f, range(10_000))
但是它是如何工作的呢?如果我想使用类似的东西来实现,比如说NodeJS和f = http("www.google.com", callback),我应该从哪里开始呢?解决此类问题的算法是什么?
同样,我想同时获得10个请求,当一个请求完成时,下一个请求应该开始。  
queue = ["www.google.com", "www.facebook.com"]
var f = function(url) {
  http.get(url, (e) => {
    const newUrl = queue.pop();
    f(newUrl);
  });
};
for (var i = 0; i < 10; i++) {
  f(queue.pop());
}
不确定 ThreadPool 和其他库是如何实现的,但这里有一个提示:使用队列来计算正在运行的任务/线程数。
我没有尝试这段代码,但它可以给你一个想法:我们创建一个线程,每 0.2 秒检查一次是否应该启动另一个线程。
然而,这意味着大量的上下文切换,并且可能效率不高。
class Pool:
    def __init__(self, func: Callable, params: list, thread_max = 10):
        self.func = func
        self.params = params
        self.running = 0
        self.finished = []
        self.thread_max = thread_max
        self.threads = []
    def start(self):
        Thread(target=check, args=(0.2)).start()
    def check(self, t_sleep=0.5):
        done = False
        while not done:
            sleep(t_sleep)
            # first check for finished threads
            for t in threads:
                if not t.isAlive():
                    # do something with return value
                    # ...
                    self.threads.remove(t)
            if not len(self.params): # mean there is no more task left to LAUNCH
                done = len(self.threads) # gonna be 0 when every tasks is COMPLETE
                continue # avoid the next part (launching thread)
            # now start some threads if needed
            while len(self.threads) < self.thread_max:
                arg = self.params.pop()
                thread = Thread(target=self.func, args=(arg, ))
                threads.insert(thread)
                thread.start()
| 归档时间: | 
 | 
| 查看次数: | 303 次 | 
| 最近记录: |