如何避免在将来在不同时间启动(非常短的)操作时启动数百个线程

Bas*_*asj 5 python multithreading timer event-loop

do_it我使用这种方法在未来的不同时间发起几十次(不到千次)调用:

import threading
timers = []
while True:
    for i in range(20):
        t = threading.Timer(i * 0.010, do_it, [i])    # I pass the parameter i to function do_it
        t.start()
        timers.append(t)  # so that they can be cancelled if needed
    wait_for_something_else() # this can last from 5 ms to 20 seconds
Run Code Online (Sandbox Code Playgroud)

每次调用的运行时间do_it都非常快(远小于 0.1 毫秒)并且是非阻塞的。我想避免为这样一个简单的任务产生数百个新线程

我怎样才能只用一个额外的线程来处理所有do_it调用呢?

有没有一种简单的方法可以用Python来做到这一点,不需要第三方库,只有标准库?

Wil*_*lva 6

据我了解,您需要一个工作线程来处理提交的任务,不是按照提交的顺序,而是按照某种优先顺序。这似乎是线程安全的工作queue.PriorityQueue

from dataclasses import dataclass, field
from threading import Thread
from typing import Any
from queue import PriorityQueue


@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)


def thread_worker(q: PriorityQueue[PrioritizedItem]):
    while True:
        do_it(q.get().item)
        q.task_done()


q = PriorityQueue()
t = Thread(target=thread_worker, args=(q,))
t.start()
while True:
    for i in range(20):
        q.put(PrioritizedItem(priority=i * 0.010, item=i))
    wait_for_something_else()
Run Code Online (Sandbox Code Playgroud)

这段代码假设你想永远运行。q.get如果没有,可以给in添加超时thread_worker,并queue.Empty在超时抛出异常时返回。这样,在处理完所有作业并且超时已过之后,您将能够加入队列/线程。

如果您想等到将来的某个特定时间来运行任务,那就会变得有点复杂。这是一种通过在工作线程中休眠直到指定时间到达来扩展上述方法的方法,但请注意,这time.sleep仅与操作系统允许的准确度一样

from dataclasses import astuple, dataclass, field
from datetime import datetime, timedelta
from time import sleep
from threading import Thread
from typing import Any
from queue import PriorityQueue


@dataclass(order=True)
class TimedItem:
    when: datetime
    item: Any=field(compare=False)


def thread_worker(q: PriorityQueue[TimedItem]):
    while True:
        when, item = astuple(q.get())
        sleep_time = (when - datetime.now()).total_seconds()
        if sleep_time > 0:
            sleep(sleep_time)
        do_it(item)
        q.task_done()


q = PriorityQueue()
t = Thread(target=thread_worker, args=(q,))
t.start()
while True:
    now = datetime.now()
    for i in range(20):
        q.put(TimedItem(when=now + timedelta(seconds=i * 0.010), item=i))
    wait_for_something_else()
Run Code Online (Sandbox Code Playgroud)

为了仅使用一个额外的线程来解决这个问题,我们必须在该线程中休眠,因此当工作线程休眠时,可能会出现具有更高优先级的新任务。在这种情况下,工作人员将在完成当前任务处理新的高优先级任务。上面的代码假设这种情况不会发生,根据问题描述,这似乎是合理的。如果可能发生这种情况,您可以更改睡眠代码以重复轮询优先级队列前面的任务是否到期。此类轮询方法的缺点是它会占用更多的 CPU 资源。

另外,如果您可以保证任务的相对顺序在提交给工作人员后不会改变,那么您可以将优先级队列替换为常规队列,queue.Queue以稍微简化代码。

do_it可以通过从队列中删除这些任务来取消它们。

上面的代码使用以下模拟定义进行了测试:

def do_it(x):
    print(x)

def wait_for_something_else():
    sleep(5)
Run Code Online (Sandbox Code Playgroud)

正如 smcjones 所指出的,另一种不使用额外线程的替代方法是使用 asyncio。这是一种使用 asyncio 的方法,它do_it通过以下方式在未来的特定时间调用loop.call_later

from dataclasses import dataclass, field
from threading import Thread
from typing import Any
from queue import PriorityQueue


@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)


def thread_worker(q: PriorityQueue[PrioritizedItem]):
    while True:
        do_it(q.get().item)
        q.task_done()


q = PriorityQueue()
t = Thread(target=thread_worker, args=(q,))
t.start()
while True:
    for i in range(20):
        q.put(PrioritizedItem(priority=i * 0.010, item=i))
    wait_for_something_else()
Run Code Online (Sandbox Code Playgroud)

do_it可以使用 返回的句柄取消这些任务loop.call_later

然而,这种方法需要切换程序以始终使用 asyncio,或者在单独的线程中运行 asyncio 事件循环。