线程池类似于多处理池?

Mar*_*tin 326 python multithreading missing-features

工作线程是否有Pool类,类似于多处理模块的Pool类

我喜欢例如并行化地图功能的简单方法

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))
Run Code Online (Sandbox Code Playgroud)

但是我想在没有创建新流程的开销的情况下这样做.

我知道GIL.但是,在我的用例中,该函数将是一个IO绑定的C函数,python包装器将在实际函数调用之前释放GIL.

我是否必须编写自己的线程池?

Mar*_*tin 426

我刚刚发现模块中确实 存在一个基于线程的Pool接口multiprocessing,但是它有些隐藏,没有正确记录.

它可以通过导入

from multiprocessing.pool import ThreadPool
Run Code Online (Sandbox Code Playgroud)

它是使用包装python线程的虚拟Process类实现的.可以multiprocessing.dummy文档中简要提及这个基于线程的Process类.据推测,这个虚拟模块基于线程提供整个多处理接口.

  • 我不明白为什么这堂课没有文件.如今,这样的助手类非常重要. (72认同)
  • @Wernight:它不公开主要是因为没有人提供一个补丁,提供它(或类似的)threading.ThreadPool,包括文档和测试.包含在标准库中确实是一个很好的电池,但如果没有人写它就不会发生.这种现有实现在多处理中的一个很好的优点是它应该使任何这样的线程补丁*更容易编写(http://docs.python.org/devguide/) (16认同)
  • @ daniel.gindi:[进一步阅读](https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.dummy):"`multiprocessing.dummy`复制了`multiprocessing`的API,但是只不过是`threading`模块的包装器." `multiprocessing`一般是关于进程,但是为了允许在进程和线程之间切换,它们(大多数)在`multiprocessing.dummy`中复制了`multiprocessing` API,但是支持线程,而不是进程.目标是允许您执行`import multiprocessing.dummy as multiprocessing`以将基于进程的代码更改为基于线程的代码. (6认同)
  • 棒极了.我在主线程之外创建ThreadPools时遇到问题,但是一旦创建了子线程,就可以使用它们.我提出了一个问题:http://bugs.python.org/issue10015 (5认同)
  • @ brandon-rhodes我错过了ThreadPool实际列在`multiprocessing.pool .__ all__`中.所以我的评论可以调整为"因为没有人为它写过文档". (2认同)
  • @CiprianTomoiaga:您被[GIL](https://wiki.python.org/moin/GIL)所咬。CPython(参考解释器)一次只能在一个线程中执行字节码。GIL的发布是为了阻止操作(锁定获取,I / O),以及在一些第三方扩展模块中,这些扩展模块在内部对非Python类型执行大量的CPU工作,但除此之外,CPU绑定的代码也无法从线程中受益所有; 在Python 2上,它通常会变得非常慢,因为大部分时间都花在了锁争用上。Py3的浪费更少,但仍然无法从多个核心中受益。 (2认同)
  • 如果我理解正确,最佳实践是使用`from multiprocessing.dummy import Pool`。这*是*实际上是`ThreadingPool`。通过这种方式,您可以轻松地在实际的多进程池和线程池之间切换。@CiprianTomoiaga,@ShadowRanger 给出了一个很好的答案——底线,只需为您的案例使用多处理池。如果您不是来回传递重物,这是一个不错的选择。 (2认同)
  • @ daniel.gindi:`multiprocessing.dummy.Pool` /`multiprocessing.pool.ThreadPool`是同一件事,都是线程池。它们模仿进程池的“接口”,但完全在线程方面实现。重新阅读文档,您将其倒退了。 (2认同)

小智 208

在Python 3中你可以使用concurrent.futures.ThreadPoolExecutor,即:

executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)
Run Code Online (Sandbox Code Playgroud)

有关更多信息和示例,请参阅文档.

  • 它也被反向移植到Python 2.5-2.7 https://pypi.python.org/pypi/futures (24认同)
  • 使用“ThreadPoolExecutor”和“multiprocessing.dummy.Pool”有什么区别? (10认同)
  • 为了使用backported期货模块,运行`sudo pip install futures` (5认同)
  • 从 Python 3.9 开始/3.10 开始,`concurrent.futures` 是一个非常有问题的库。看起来它已经被没有得到适当修复的错误淹没了。也许,这个图书馆的整体前提很糟糕。我更熟悉这个库的基于进程的部分,其中有无数的原因导致池永远挂起、吞下错误和以其他方式行为不当。我会尽可能远离这个图书馆。 (3认同)

war*_*res 57

是的,它似乎(或多或少)具有相同的API.

import multiprocessing

def worker(lnk):
    ....    
def start_process():
    .....
....

if(PROCESS):
    pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
    pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, 
                                           initializer=start_process)

pool.map(worker, inputs)
....
Run Code Online (Sandbox Code Playgroud)

  • `ThreadPool`的导入路径与`Pool`不同.正确的导入是`来自multiprocessing.pool import ThreadPool`. (8认同)
  • 这就是我一直在寻找的。这只是一个单独的导入行和对我现有池行的一个小改动,它完美地工作。 (3认同)
  • 奇怪的是,这不是一个文档化的 API,并且 multiprocessing.pool 只是简单地提到提供 AsyncResult。但它在 2.x 和 3.x 中可用。 (2认同)

dgo*_*sen 37

对于非常简单和轻量级的东西(从这里略微修改):

from Queue import Queue
from threading import Thread


class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception, e:
                print e
            finally:
                self.tasks.task_done()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

if __name__ == '__main__':
    from random import randrange
    from time import sleep

    delays = [randrange(1, 10) for i in range(100)]

    def wait_delay(d):
        print 'sleeping for (%d)sec' % d
        sleep(d)

    pool = ThreadPool(20)

    for i, d in enumerate(delays):
        pool.add_task(wait_delay, d)

    pool.wait_completion()
Run Code Online (Sandbox Code Playgroud)

要支持任务完成时的回调,您只需将回调添加到任务元组即可.


小智 11

您好在Python中使用线程池,您可以使用此库:

from multiprocessing.dummy import Pool as ThreadPool
Run Code Online (Sandbox Code Playgroud)

然后使用,这个库就是这样的:

pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results
Run Code Online (Sandbox Code Playgroud)

线程是您想要的线程数,任务是大多数映射到服务的任务列表.

  • 我们错过了 `.close()` 和 `.join()` 调用,这导致 `.map()` 在所有线程完成之前完成。只是一个警告。 (2认同)

pel*_*los 7

另一种方法是将进程添加到线程队列池中

import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
    for i in range(10):
        a = executor.submit(arg1, arg2,....)
Run Code Online (Sandbox Code Playgroud)


for*_*tor 5

这是我最终使用的结果。它是上述dgorissen类的修改版本。

文件: threadpool.py

from queue import Queue, Empty
import threading
from threading import Thread


class Worker(Thread):
    _TIMEOUT = 2
    """ Thread executing tasks from a given tasks queue. Thread is signalable, 
        to exit
    """
    def __init__(self, tasks, th_num):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon, self.th_num = True, th_num
        self.done = threading.Event()
        self.start()

    def run(self):       
        while not self.done.is_set():
            try:
                func, args, kwargs = self.tasks.get(block=True,
                                                   timeout=self._TIMEOUT)
                try:
                    func(*args, **kwargs)
                except Exception as e:
                    print(e)
                finally:
                    self.tasks.task_done()
            except Empty as e:
                pass
        return

    def signal_exit(self):
        """ Signal to thread to exit """
        self.done.set()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads, tasks=[]):
        self.tasks = Queue(num_threads)
        self.workers = []
        self.done = False
        self._init_workers(num_threads)
        for task in tasks:
            self.tasks.put(task)

    def _init_workers(self, num_threads):
        for i in range(num_threads):
            self.workers.append(Worker(self.tasks, i))

    def add_task(self, func, *args, **kwargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kwargs))

    def _close_all_threads(self):
        """ Signal all threads to exit and lose the references to them """
        for workr in self.workers:
            workr.signal_exit()
        self.workers = []

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

    def __del__(self):
        self._close_all_threads()


def create_task(func, *args, **kwargs):
    return (func, args, kwargs)
Run Code Online (Sandbox Code Playgroud)

使用游泳池

from random import randrange
from time import sleep

delays = [randrange(1, 10) for i in range(30)]

def wait_delay(d):
    print('sleeping for (%d)sec' % d)
    sleep(d)

pool = ThreadPool(20)
for i, d in enumerate(delays):
    pool.add_task(wait_delay, d)
pool.wait_completion()
Run Code Online (Sandbox Code Playgroud)


Kas*_*hif 5

是的,有一个类似于多处理池的线程池,但是,它有点隐藏并且没有正确记录。您可以通过以下方式导入它:-

from multiprocessing.pool import ThreadPool
Run Code Online (Sandbox Code Playgroud)

我只是给你展示一个简单的例子

def test_multithread_stringio_read_csv(self):
        # see gh-11786
        max_row_range = 10000
        num_files = 100

        bytes_to_df = [
            '\n'.join(
                ['%d,%d,%d' % (i, i, i) for i in range(max_row_range)]
            ).encode() for j in range(num_files)]
        files = [BytesIO(b) for b in bytes_to_df]

        # read all files in many threads
        pool = ThreadPool(8)
        results = pool.map(self.read_csv, files)
        first_result = results[0]

        for result in results:
            tm.assert_frame_equal(first_result, result) 
Run Code Online (Sandbox Code Playgroud)