限制一次运行的最大线程数的正确方法?

d33*_*tah 38 python multithreading python-multithreading

我想创建一个运行多个轻量级线程的程序,但是将其自身限制为一个恒定的,预定义数量的并发运行任务,就像这样(但没有竞争条件的风险):

import threading

def f(arg):
    global running
    running += 1
    print("Spawned a thread. running=%s, arg=%s" % (running, arg))
    for i in range(100000):
        pass
    running -= 1
    print("Done")

running = 0
while True:
    if running < 8:
        arg = get_task()
        threading.Thread(target=f, args=[arg]).start()
Run Code Online (Sandbox Code Playgroud)

实现这一目标的最安全/最快的方法是什么?

cdh*_*wie 34

听起来你想要用八个工人来实现生产者/消费者模式.Python有一个Queue用于此目的的类,它是线程安全的.

每个工作者都应该调用get()队列来检索任务.如果没有任何可用任务,此调用将阻止,从而导致工作程序空闲,直到有一个可用.然后工作人员应该执行任务并最终调用task_done()队列.

您可以通过调用队列将任务放入put()队列中.

从主线程,您可以调用join()队列等待所有挂起的任务完成.

这种方法的好处是您不会创建和销毁线程,这是昂贵的.工作线程将连续运行,但在没有任务进入队列时将使用零CPU时间睡眠.

(链接的文档页面有一个这种模式的例子.)

  • 工作得很好,但我想知道如何发信号通知我完成了发送任务的线程? (2认同)
  • 发送'我完成'任务,指示池线程终止.任何获得此类任务的线程都会重新排队,然后自杀. (2认同)
  • @MartinJames有关发送"我已完成的任务"的建议,以便任何获取它的线程重新排列然后自杀,对我来说效果不佳.我在这里有一个关于它的后续问题:/sf/ask/3161869161/. (2认同)

Ham*_*eem 17

信号量是一种变量或抽象数据类型,用于控制并发系统(如多道程序设计操作系统)中多个进程对公共资源的访问; 这可以帮到你.

threadLimiter = threading.BoundedSemaphore(maximumNumberOfThreads)

class MyThread(threading.Thread):

    def run(self):
        threadLimiter.acquire()
        try:
            self.Executemycode()
        finally:
            threadLimiter.release()

    def Executemycode(self):
        print(" Hello World!") 
        # <your code here>
Run Code Online (Sandbox Code Playgroud)

这样,您可以轻松地限制在程序执行期间将同时执行的线程数.变量'maximumNumberOfThreads'可用于定义线程最大值的上限.

学分

  • 谢谢!这正是我想要的! (2认同)
  • 不适用于 MAC 操作系统上的 python 3.7.6。不将线程数限制为 1 (2认同)

Pau*_*obs 12

I ran into this same problem and spent days (2 days to be precise) getting to the correct solution using a queue. I wasted a day going down the ThreadPoolExecutor path because there is no way to limit the number of threads that thing launches! I fed it a list of 5000 files to copy and the code went non-responsive once it got up to about 1500 concurrent file copies running all at once. The max_workers parameter on the ThreadPoolExecutor only controls how many workers are spinning up threads not how many threads get spun up.

Ok, anyway, here is a very simple example of using a Queue for this:

import threading, time, random
from queue import Queue

jobs = Queue()

def do_stuff(q):
    while not q.empty():
        value = q.get()
        time.sleep(random.randint(1, 10))
        print(value)
        q.task_done()

for i in range(10):
    jobs.put(i)

for i in range(3):
    worker = threading.Thread(target=do_stuff, args=(jobs,))
    worker.start()

print("waiting for queue to complete", jobs.qsize(), "tasks")
jobs.join()
print("all done")
Run Code Online (Sandbox Code Playgroud)

  • Divij,ThreadPoolExecutor 上的 max_workers 参数仅控制有多少工作线程正在旋转,而不是有多少线程被旋转。如果将其设置为 1,那么您将获得单线程性能。如果您将其设置为 2 并且您有一个队列,如果有数千个长时间运行的任务,那么这两个工作线程就会开始旋转线程,并且直到为每个项目旋转一个线程为止才会停止。如果这些任务正在争夺相同的资源,例如内存、存储或网络,那么您将面临一个大问题。 (3认同)
  • 您可以按如下方式限制它一次启动的线程数:```ThreadPoolExecutor(max_workers=10)``` 或 `20` 或 `30` 等 (2认同)

Kir*_*ser 7

我见过最常见的写法是:

threads = [threading.Thread(target=f) for _ in range(8)]
for thread in threads:
    thread.start()
...
for thread in threads:
    thread.join()
Run Code Online (Sandbox Code Playgroud)

如果您想维护一个固定大小的运行线程池来处理短期任务而不是请求新工作,请考虑围绕队列构建的解决方案,例如“如何在 Python 中等待第一个线程完成”。


Vid*_*ya 7

使用 threading.activeCount() 方法限制最大线程的简单且最简单的方法

import threading, time

maxthreads = 10

def do_stuff(i):
    print(i)
    print("Total Active threads are {0}".format(threading.activeCount()))
    time.sleep(20)

count = 0
while True:
    if threading.activeCount() <= maxthreads:
        worker = threading.Thread(target=do_stuff, args=(count,))
        worker.start()
        count += 1
Run Code Online (Sandbox Code Playgroud)


aba*_*ert 5

使用multiprocessing.dummy.Poolconcurrent.futures.ThreadPoolExecutor(或者,如果使用Python 2.x,则使用backport futures)将其实现为线程池或执行程序会容易得多。例如:

import concurrent

def f(arg):
    print("Started a task. running=%s, arg=%s" % (running, arg))
    for i in range(100000):
        pass
    print("Done")

with concurrent.futures.ThreadPoolExecutor(8) as executor:
    while True:
        arg = get_task()
        executor.submit(f, arg)
Run Code Online (Sandbox Code Playgroud)

当然,如果您可以将拉模型get_task更改为推模型get_tasks,例如一次只能产生一个任务,那么这会更加简单:

with concurrent.futures.ThreadPoolExecutor(8) as executor:
    for arg in get_tasks():
        executor.submit(f, arg)
Run Code Online (Sandbox Code Playgroud)

当您用完任务(例如get_task引发异常或get_tasks枯竭)时,这将自动告诉执行程序在耗尽队列,等待其停止并清理所有内容之后停止。


Cir*_*四事件 5

concurrent.futures.ThreadPoolExecutor.map

concurrent.futures.ThreadPoolExecutor在/sf/answers/1355919771/中提到,这里是该方法的示例map,这通常是最方便的方法。

.map()是 的并行版本map():它立即读取所有输入,然后并行运行任务,并以与输入相同的顺序返回。

用法:

./concurrent_map_exception.py [nproc [min [max]]
Run Code Online (Sandbox Code Playgroud)

并发映射异常.py

import concurrent.futures
import sys
import time

def my_func(i):
    time.sleep((abs(i) % 4) / 10.0)
    return 10.0 / i

def my_get_work(min_, max_):
    for i in range(min_, max_):
        print('my_get_work: {}'.format(i))
        yield i

# CLI.
argv_len = len(sys.argv)
if argv_len > 1:
    nthreads = int(sys.argv[1])
    if nthreads == 0:
        nthreads = None
else:
    nthreads = None
if argv_len > 2:
    min_ = int(sys.argv[2])
else:
    min_ = 1
if argv_len > 3:
    max_ = int(sys.argv[3])
else:
    max_ = 100

# Action.
with concurrent.futures.ProcessPoolExecutor(max_workers=nthreads) as executor:
    for input, output in zip(
        my_get_work(min_, max_),
        executor.map(my_func, my_get_work(min_, max_))
    ):
        print('result: {} {}'.format(input, output))
Run Code Online (Sandbox Code Playgroud)

GitHub 上游.

例如:

./concurrent_map_exception.py 1 1 5
Run Code Online (Sandbox Code Playgroud)

给出:

my_get_work: 1
my_get_work: 2
my_get_work: 3
my_get_work: 4
my_get_work: 1
result: 1 10.0
my_get_work: 2
result: 2 5.0
my_get_work: 3
result: 3 3.3333333333333335
my_get_work: 4
result: 4 2.5
Run Code Online (Sandbox Code Playgroud)

和:

./concurrent_map_exception.py 2 1 5
Run Code Online (Sandbox Code Playgroud)

给出相同的输出,但运行速度更快,因为我们现在有 2 个进程,并且:

./concurrent_map_exception.py 1 -5 5
Run Code Online (Sandbox Code Playgroud)

给出:

my_get_work: -5
my_get_work: -4
my_get_work: -3
my_get_work: -2
my_get_work: -1
my_get_work: 0
my_get_work: 1
my_get_work: 2
my_get_work: 3
my_get_work: 4
my_get_work: -5
result: -5 -2.0
my_get_work: -4
result: -4 -2.5
my_get_work: -3
result: -3 -3.3333333333333335
my_get_work: -2
result: -2 -5.0
my_get_work: -1
result: -1 -10.0
my_get_work: 0
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 175, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 153, in _process_chunk
    return [fn(*args) for args in chunk]
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 153, in <listcomp>
    return [fn(*args) for args in chunk]
  File "./concurrent_map_exception.py", line 24, in my_func
    return 10.0 / i
ZeroDivisionError: float division by zero
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "./concurrent_map_exception.py", line 52, in <module>
    executor.map(my_func, my_get_work(min_, max_))
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 366, in _chain_from_iterable_of_lists
    for element in iterable:
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 586, in result_iterator
    yield fs.pop().result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
ZeroDivisionError: float division by zero
Run Code Online (Sandbox Code Playgroud)

因此请注意它如何在出现异常时立即停止。

Queue错误处理示例

Queue在/sf/answers/1355891421/中提到,但这里有一个完整的示例。

设计目标:

  • 输入功能不需要修改
  • 限制线程数量
  • 队列大小与线程数密切相关
  • 仅根据需要获取输入输入,而不是预先获取所有内容
  • 如果发生错误,可以选择随后立即停止
  • 工作函数引发异常,清楚地显示堆栈跟踪

concurrent.futures.ThreadPoolExecutor是我见过的 stdlib 目前最好的界面。但是我找不到如何执行以下所有操作:

  • 让它一点点完美地输入输入
  • 出错时立即失败
  • 接受具有多个参数的函数

因为:

  • .map():一次读取所有输入并且func只能接受参数
  • .submit():执行直到所有 future 完成,并且最大当前工作项.shutdown()没有阻塞。.submit()那么如何避免.cancel()在第一次失败后所有期货出现丑陋的循环呢?

话不多说,这是我的实现。测试用例位于脚本末尾__name__ == '__main__'

线程池.py

#!/usr/bin/env python3

'''
This file is MIT Licensed because I'm posting it on Stack Overflow:
/sf/ask/1355880711/#55263676
'''

from typing import Any, Callable, Dict, Iterable, Union
import os
import queue
import sys
import threading
import time
import traceback

class ThreadPoolExitException(Exception):
    '''
    An object of this class may be raised by output_handler_function to
    request early termination.

    It is also raised by submit() if submit_raise_exit=True.
    '''
    pass

class ThreadPool:
    '''
    Start a pool of a limited number of threads to do some work.

    This is similar to the stdlib concurrent, but I could not find
    how to reach all my design goals with that implementation:

    * the input function does not need to be modified
    * limit the number of threads
    * queue sizes closely follow number of threads
    * if an exception happens, optionally stop soon afterwards

    This class form allows to use your own while loops with submit().

    Exit soon after the first failure happens:

    ....
    python3 thread_pool.py 2 -10 20 handle_output_print
    ....

    Sample output:

    ....
    {'i': -9} -1.1111111111111112 None
    {'i': -8} -1.25 None
    {'i': -10} -1.0 None
    {'i': -6} -1.6666666666666667 None
    {'i': -7} -1.4285714285714286 None
    {'i': -4} -2.5 None
    {'i': -5} -2.0 None
    {'i': -2} -5.0 None
    {'i': -3} -3.3333333333333335 None
    {'i': 0} None ZeroDivisionError('float division by zero')
    {'i': -1} -10.0 None
    {'i': 1} 10.0 None
    {'i': 2} 5.0 None
    work_function or handle_output raised:
    Traceback (most recent call last):
    File "thread_pool.py", line 181, in _func_runner
        work_function_return = self.work_function(**work_function_input)
    File "thread_pool.py", line 281, in work_function_maybe_raise
        return 10.0 / i
    ZeroDivisionError: float division by zero
    work_function_input: {'i': 0}
    work_function_return: None
    ....

    Don't exit after first failure, run until end:

    ....
    python3 thread_pool.py 2 -10 20 handle_output_print_no_exit
    ....

    Store results in a queue for later inspection instead of printing immediately,
    then print everything at the end:

    ....
    python3 thread_pool.py 2 -10 20 handle_output_queue
    ....

    Exit soon after the handle_output raise.

    ....
    python3 thread_pool.py 2 -10 20 handle_output_raise
    ....

    Relying on this interface to abort execution is discouraged, this should
    usually only happen due to a programming error in the handler.

    Test that the argument called "thread_id" is passed to work_function and printed:

    ....
    python3 thread_pool.py 2 -10 20 handle_output_print thread_id
    ....

    Test with, ThreadPoolExitException and submit_raise_exit=True, same behaviour handle_output_print
    except for the different exit cause report:

    ....
    python3 thread_pool.py 2 -10 20 handle_output_raise_exit_exception
    ....
    '''
    def __init__(
        self,
        work_function: Callable,
        handle_output: Union[Callable[[Any,Any,Exception],Any],None] = None,
        nthreads: Union[int,None] = None,
        thread_id_arg: Union[str,None] = None,
        submit_raise_exit: bool = False
    ):
        '''
        Start in a thread pool immediately.

        join() must be called afterwards at some point.

        :param work_function: main work function to be evaluated.
        :param handle_output: called on work_function return values as they
            are returned.

            The function signature is:

            ....
            handle_output(
                work_function_input: Union[Dict,None],
                work_function_return,
                work_function_exception: Exception
            ) -> Union[Exception,None]
            ....

            where work_function_exception the exception that work_function raised,
            or None otherwise

            The first non-None return value of a call to this function is returned by
            submit(), get_handle_output_result() and join().

            The intended semantic for this, is to return:

            *   on success:
            ** None to continue execution
            ** ThreadPoolExitException() to request stop execution
            * if work_function_input or work_function_exception raise:
            ** the exception raised

            The ThreadPool user can then optionally terminate execution early on error
            or request with either:

            * an explicit submit() return value check + break if a submit loop is used
            * `with` + submit_raise_exit=True

            Default: a handler that just returns `exception`, which can normally be used
            by the submit loop to detect an error and exit immediately.
        :param nthreads: number of threads to use. Default: nproc.
        :param thread_id_arg: if not None, set the argument of work_function with this name
            to a 0-indexed thread ID. This allows function calls to coordinate
            usage of external resources such as files or ports.
        :param submit_raise_exit: if True, submit() raises ThreadPoolExitException() if
            get_handle_output_result() is not None.
        '''
        self.work_function = work_function
        if handle_output is None:
            handle_output = lambda input, output, exception: exception
        self.handle_output = handle_output
        if nthreads is None:
            nthreads = len(os.sched_getaffinity(0))
        self.thread_id_arg = thread_id_arg
        self.submit_raise_exit = submit_raise_exit
        self.nthreads = nthreads
        self.handle_output_result = None
        self.handle_output_result_lock = threading.Lock()
        self.in_queue = queue.Queue(maxsize=nthreads)
        self.threads = []
        for i in range(self.nthreads):
            thread = threading.Thread(
                target=self._func_runner,
                args=(i,)
            )
            self.threads.append(thread)
            thread.start()

    def __enter__(self):
        '''
        __exit__ automatically calls join() for you.

        This is cool because it automatically ends the loop if an exception occurs.

        But don't forget that errors may happen after the last submit was called, so you
        likely want to check for that with get_handle_output_result() after the with.
        '''
        return self

    def __exit__(self, exception_type, exception_value, exception_traceback):
        self.join()
        return exception_type is ThreadPoolExitException

    def _func_runner(self, thread_id):
        while True:
            work_function_input = self.in_queue.get(block=True)
            if work_function_input is None:
                break
            if self.thread_id_arg is not None:
                work_function_input[self.thread_id_arg] = thread_id
            try:
                work_function_exception = None
                work_function_return = self.work_function(**work_function_input)
            except Exception as e:
                work_function_exception = e
                work_function_return = None
            handle_output_exception = None
            try:
                handle_output_return = self.handle_output(
                    work_function_input,
                    work_function_return,
                    work_function_exception
                )
            except Exception as e:
                handle_output_exception = e
            handle_output_result = None
            if handle_output_exception is not None:
                handle_output_result = handle_output_exception
            elif handle_output_return is not None:
                handle_output_result = handle_output_return
            if handle_output_result is not None and self.handle_output_result is None:
                with self.handle_output_result_lock:
                    self.handle_output_result = (
                        work_function_input,
                        work_function_return,
                        handle_output_result
                    )
            self.in_queue.task_done()

    @staticmethod
    def exception_traceback_string(exception):
        '''
        Helper to get the traceback from an exception object.
        This is usually what you want to print if an error happens in a thread:
        /sf/ask/259187281/#56199295
        '''
        return ''.join(traceback.format_exception(
            None, exception, exception.__traceback__)
        )

    def get_handle_output_result(self):
        '''
        :return: if a handle_output call has raised previously, return a tuple:

            ....
            (work_function_input, work_function_return, exception_raised)
            ....

            corresponding to the first such raise.

            Otherwise, if a handle_output returned non-None, a tuple:

            (work_function_input, work_function_return, handle_output_return)

            Otherwise, None.
        '''
        return self.handle_output_result

    def join(self):
        '''
        Request all threads to stop after they finish currently submitted work.

        :return: same as get_handle_output_result()
        '''
        for thread in range(self.nthreads):
            self.in_queue.put(None)
        for thread in self.threads:
            thread.join()
        return self.get_handle_output_result()

    def submit(
        self,
        work_function_input: Union[Dict,None] =None
    ):
        '''
        Submit work. Block if there is already enough work scheduled (~nthreads).

        :return: the same as get_handle_output_result
        '''
        handle_output_result = self.get_handle_output_result()
        if handle_output_result is not None and self.submit_raise_exit:
            raise ThreadPoolExitException()
        if work_function_input is None:
            work_function_input = {}
        self.in_queue.put(work_function_input)
        return handle_output_result

if __name__ == '__main__':
    def get_work(min_, max_):
        '''
        Generate simple range work for work_function.
        '''
        for i in range(min_, max_):
            yield {'i': i}

    def work_function_maybe_raise(i):
        '''
        The main function that will be evaluated.

        It sleeps to simulate an IO operation.
        '''
        time.sleep((abs(i) % 4) / 10.0)
        return 10.0 / i

    def work_function_get_thread(i, thread_id):
        time.sleep((abs(i) % 4) / 10.0)
        return thread_id

    def handle_output_print(input, output, exception):
        '''
        Print outputs and exit immediately on failure.
        '''
        print('{!r} {!r} {!r}'.format(input, output, exception))
        return exception

    def handle_output_print_no_exit(input, output, exception):
        '''
        Print outputs, don't exit on failure.
        '''
        print('{!r} {!r} {!r}'.format(input, output, exception))

    out_queue = queue.Queue()
    def handle_output_queue(input, output, exception):
        '''
        Store outputs in a queue for later usage.
        '''
        global out_queue
        out_queue.put((input, output, exception))
        return exception

    def handle_output_raise(input, output, exception):
        '''
        Raise if input == 0, to test that execution
        stops nicely if this raises.
        '''
        print('{!r} {!r} {!r}'.format(input, output, exception))
        if input['i'] == 0:
            raise Exception

    def handle_output_raise_exit_exception(input, output, exception):
        '''
        Return a ThreadPoolExitException() if input == -5.
        Return the work_function exception if it raised.
        '''
        print('{!r} {!r} {!r}'.format(input, output, exception))
        if exception:
            return exception
        if output == 10.0 / -5:
            return ThreadPoolExitException()

    # CLI arguments.
    argv_len = len(sys.argv)
    if argv_len > 1:
        nthreads = int(sys.argv[1])
        if nthreads == 0:
            nthreads = None
    else:
        nthreads = None
    if argv_len > 2:
        min_ = int(sys.argv[2])
    else:
        min_ = 1
    if argv_len > 3:
        max_ = int(sys.argv[3])
    else:
        max_ = 100
    if argv_len > 4:
        handle_output_funtion_string = sys.argv[4]
    else:
        handle_output_funtion_string = 'handle_output_print'
    handle_output = eval(handle_output_funtion_string)
    if argv_len > 5:
        work_function = work_function_get_thread
        thread_id_arg = sys.argv[5]
    else:
        work_function = work_function_maybe_raise
        thread_id_arg = None

    # Action.
    if handle_output is handle_output_raise_exit_exception:
        # `with` version with implicit join and submit raise
        # immediately when desired with ThreadPoolExitException.
        #
        # This is the more safe and convenient and DRY usage if
        # you can use `with`, so prefer it generally.
        with ThreadPool(
            work_function,
            handle_output,
            nthreads,
            thread_id_arg,
            submit_raise_exit=True
        ) as my_thread_pool:
            for work in get_work(min_, max_):
                my_thread_pool.submit(work)
        handle_output_result = my_thread_pool.get_handle_output_result()
    else:
        # Explicit error checking in submit loop to exit immediately
        # on error.
        my_thread_pool = ThreadPool(
            work_function,
            handle_output,
            nthreads,
            thread_id_arg,
        )
        for work_function_input in get_work(min_, max_):
            handle_output_result = my_thread_pool.submit(work_function_input)
            if handle_output_result is not None:
                break
        handle_output_result = my_thread_pool.join()
    if handle_output_result is not None:
        work_function_input, work_function_return, exception = handle_output_result
        if type(exception) is ThreadPoolExitException:
            print('Early exit requested by handle_output with ThreadPoolExitException:')
        else:
            print('work_function or handle_output raised:')
            print(ThreadPool.exception_traceback_string(exception), end='')
        print('work_function_input: {!r}'.format(work_function_input))
        print('work_function_return: {!r}'.format(work_function_return))
    if handle_output == handle_output_queue:
        while not out_queue.empty():
            print(out_queue.get())
Run Code Online (Sandbox Code Playgroud)

GitHub 上游.

在 Python 3.7.3 中测试。


归档时间:

查看次数:

62111 次

最近记录:

6 年,3 月 前