延迟调用函数

Moo*_*ama 5 python multithreading job-scheduling

我收到了这个面试练习题。

实现一个作业调度程序,它接受函数 f 和整数 n,并在 n 毫秒后调用 f。

我有一个非常简单的解决方案:

import time

def schedulerX(f,n):
    time.sleep(0.001*n)
    f
Run Code Online (Sandbox Code Playgroud)

然而,建议的解决方案更加详细,如下所示。我不明白所有这些额外代码背后的目的是什么。请赐教。

from time import sleep
import threading

class Scheduler:
    def __init__(self):
        self.fns = [] # tuple of (fn, time)
        t = threading.Thread(target=self.poll)
        t.start()

    def poll(self):
        while True:
            now = time() * 1000
            for fn, due in self.fns:
                if now > due:
                    fn()
            self.fns = [(fn, due) for (fn, due) in self.fns if due > now]
            sleep(0.01)

    def delay(self, f, n):
        self.fns.append((f, time() * 1000 + n))
Run Code Online (Sandbox Code Playgroud)

小智 6

正如其他人指出的那样,您的解决方案是“阻塞”的:它会阻止在等待运行时发生其他任何事情。建议的解决方案的目的是让您安排工作,然后同时继续处理其他事情。

至于建议代码正在做什么的解释:

您首先创建一个Scheduler,它将启动自己的线程,该线程在后台有效运行,并且将运行作业。

scheduler = Scheduler()
Run Code Online (Sandbox Code Playgroud)

在您的代码中,您可以安排您想要的任何作业,而不必等待它们运行:

def my_recurring_job():
    # Do some stuff in the background, then re-run this job again
    # in one second.

    ### Do some stuff ###

    scheduler.delay(my_recurring_job, 1000)

scheduler.delay(lambda: print("5 seconds passed!"), 5 * 1000)
scheduler.delay(lambda: print("2 hours passed!"), 2 * 60 * 60 * 1000)
scheduler.delay(my_recurring_job, 1000)

# You can keep doing other stuff without waiting
Run Code Online (Sandbox Code Playgroud)

调度程序的线程只是在其poll方法中永远循环,运行所有到了时间的作业,然后休眠 0.01 秒并再次检查。代码中有一个小错误,如果 now == due,则作业将不会运行,但也不会保留以供以后使用。应该是if now >= due:相反。

更高级的调度程序可能会使用 athreading.Condition而不是每秒轮询 100 次:

import threading
from time import time

class Scheduler:
    def __init__(self):
        self.fns = [] # tuple of (fn, time)

        # The lock prevents 2 threads from messing with fns at the same time;
        # also lets us use Condition
        self.lock = threading.RLock()

        # The condition lets one thread wait, optionally with a timeout,
        # and lets other threads wake it up
        self.condition = threading.Condition(self.lock)

        t = threading.Thread(target=self.poll)
        t.start()

    def poll(self):
        while True:
            now = time() * 1000

            with self.lock:
                # Prevent the other thread from adding to fns while we're sorting
                # out the jobs to run now, and the jobs to keep for later

                to_run = [fn for fn, due in self.fns if due <= now]
                self.fns = [(fn, due) for (fn, due) in self.fns if due > now]

            # Run all the ready jobs outside the lock, so we don't keep it
            # locked longer than we have to
            for fn in to_run:
                fn()

            with self.lock:
                if not self.fns:
                    # If there are no more jobs, wait forever until a new job is 
                    # added in delay(), and notify_all() wakes us up again
                    self.condition.wait()
                else:
                    # Wait only until the soonest next job's due time.
                    ms_remaining = min(due for fn, due in self.fns) - time()*1000
                    if ms_remaining > 0:
                        self.condition.wait(ms_remaining / 1000)

    def delay(self, f, n):
        with self.lock:
            self.fns.append((f, time() * 1000 + n))

            # If the scheduler thread is currently waiting on the condition,
            # notify_all() will wake it up, so that it can consider the new job's
            # due time.
            self.condition.notify_all()
Run Code Online (Sandbox Code Playgroud)


gmd*_*mds 0

有一些差异(理论上)。

我认为,第一也是最重要的是,您的解决方案可以有效地一次仅安排一项功能。举例来说,假设您想f1在 10 毫秒后运行一个函数,并f2在 10 毫秒后运行另一个函数。

您将无法轻松做到这一点,因为类似的事情schedulerX(f1, 10); schedulerX(f2, 10)在开始等待之前等待f1f2完成运行。如果f1需要一个小时,你的日程安排f2将完全错误。

显然,第二个版本的目的是让计时器和每个函数在单独的线程中运行,以便一个函数调用不会阻塞另一个函数调用。

然而,正如其他人在评论中指出的那样,导入是错误的,list即使问题规范说是一个函数,它也需要一些函数,并且它实际上并不按照我描述的方式工作,所以或多或少,没有什么区别。