Fro*_*oor 3 multithreading class python-3.x
这里发布了一个解决方案来创建一个可停止的线程。但是,我在理解如何实施此解决方案时遇到了一些问题。
使用代码...
import threading
class StoppableThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self):
super(StoppableThread, self).__init__()
self._stop_event = threading.Event()
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
Run Code Online (Sandbox Code Playgroud)
如何创建一个线程来运行每 1 秒向终端打印一次“Hello”的函数。5 秒后,我使用 .stop() 停止循环函数/线程。
我再次在理解如何实现这个停止解决方案时遇到了麻烦,这是我目前所拥有的。
import threading
import time
class StoppableThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self):
super(StoppableThread, self).__init__()
self._stop_event = threading.Event()
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
def funct():
while not testthread.stopped():
time.sleep(1)
print("Hello")
testthread = StoppableThread()
testthread.start()
time.sleep(5)
testthread.stop()
Run Code Online (Sandbox Code Playgroud)
上面的代码创建了可以被 testthread.stop() 命令停止的线程 testthread。据我所知,这只是创建一个空线程......有没有办法创建一个运行funct()的线程,并且该线程将在我使用.stop()时结束。基本上我不知道如何实现 StoppableThread 类来将 funct() 函数作为线程运行。
常规线程函数的示例...
import threading
import time
def example():
x = 0
while x < 5:
time.sleep(1)
print("Hello")
x = x + 1
t = threading.Thread(target=example)
t.start()
t.join()
#example of a regular threaded function.
Run Code Online (Sandbox Code Playgroud)
您在原始示例中使用代码的方式存在一些问题。首先,您没有将任何构造函数参数传递给基本构造函数。这是一个问题,因为正如您在普通线程示例中所见,构造函数参数通常是必需的。你应该重写StoppableThread.__init__如下:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._stop_event = threading.Event()
Run Code Online (Sandbox Code Playgroud)
由于您使用的是 Python 3,因此您无需为super. 现在你可以做
testthread = StoppableThread(target=funct)
Run Code Online (Sandbox Code Playgroud)
这仍然不是最佳解决方案,因为funct使用外部变量testthread来停止自身。虽然这对于像您这样的小例子来说是可以的,但使用这样的全局变量通常会导致巨大的维护负担,而您不想这样做。更好的解决方案是StoppableThread为您的特定任务扩展通用类,以便您可以self正确访问:
class MyTask(StoppableThread):
def run(self):
while not self.stopped():
time.sleep(1)
print("Hello")
testthread = MyTask()
testthread.start()
time.sleep(5)
testthread.stop()
Run Code Online (Sandbox Code Playgroud)
如果您绝对不想扩展StoppableThread,则可以current_thread在任务中使用该函数,而不是读取全局变量:
def funct():
while not current_thread().stopped():
time.sleep(1)
print("Hello")
testthread = StoppableThread(target=funct)
testthread.start()
sleep(5)
testthread.stop()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3995 次 |
| 最近记录: |