Tkinter:如何使用线程来防止主事件循环"冻结"

Dir*_*uin 43 python multithreading tkinter event-loop progress-bar

我有一个小的GUI测试,带有"开始"按钮和进度条.期望的行为是:

  • 单击开始
  • 进度条振荡5秒
  • 进度条停止

观察到的行为是"开始"按钮冻结5秒钟,然后显示进度条(无振荡).

到目前为止,这是我的代码:

class GUI:
    def __init__(self, master):
        self.master = master
        self.test_button = Button(self.master, command=self.tb_click)
        self.test_button.configure(
            text="Start", background="Grey",
            padx=50
            )
        self.test_button.pack(side=TOP)

    def progress(self):
        self.prog_bar = ttk.Progressbar(
            self.master, orient="horizontal",
            length=200, mode="indeterminate"
            )
        self.prog_bar.pack(side=TOP)

    def tb_click(self):
        self.progress()
        self.prog_bar.start()
        # Simulate long running process
        t = threading.Thread(target=time.sleep, args=(5,))
        t.start()
        t.join()
        self.prog_bar.stop()

root = Tk()
root.title("Test Button")
main_ui = GUI(root)
root.mainloop()
Run Code Online (Sandbox Code Playgroud)

基于从布赖恩奥克利的信息在这里,我明白,我需要使用线程.我尝试创建一个线程,但我猜测,因为线程是从主线程中启动的,所以没有用.

我有想法放置在不同的类中的逻辑部分,以及从该类,类似于由A.罗达斯示例代码内实例化GUI 这里.

我的问题:

我无法弄清楚如何编码它,以便这个命令:

self.test_button = Button(self.master, command=self.tb_click)
Run Code Online (Sandbox Code Playgroud)

调用位于另一个类中的函数.这是一件坏事还是可能?我如何创建一个可以处理self.tb_click的第二类?我试着跟随A. Rodas的示例代码,它的工作非常精彩.但是我无法弄清楚如何在触发动作的Button小部件的情况下实现他的解决方案.

如果我应该在单个GUI类中处理线程,那么如何创建一个不干扰主线程的线程呢?

A. *_*das 56

当您在主线程中加入新线程时,它将等到线程完成,因此即使您使用多线程,GUI也会阻塞.

如果要将逻辑部分放在不同的类中,可以直接对Thread进行子类化,然后在按下按钮时启动该类的新对象.Thread的这个子类的构造函数可以接收Queue对象,然后您就可以将它与GUI部分进行通信.所以我的建议是:

  1. 在主线程中创建一个Queue对象
  2. 创建一个可以访问该队列的新线程
  3. 定期检查主线程中的队列

然后你必须解决如果用户点击两次相同按钮会发生什么的问题(它会在每次点击时产生一个新线程),但你可以通过禁用启动按钮并在你打电话后再次启用它来修复它self.prog_bar.stop().

import Queue

class GUI:
    # ...

    def tb_click(self):
        self.progress()
        self.prog_bar.start()
        self.queue = Queue.Queue()
        ThreadedTask(self.queue).start()
        self.master.after(100, self.process_queue)

    def process_queue(self):
        try:
            msg = self.queue.get(0)
            # Show result of the task if needed
            self.prog_bar.stop()
        except Queue.Empty:
            self.master.after(100, self.process_queue)

class ThreadedTask(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
    def run(self):
        time.sleep(5)  # Simulate long running process
        self.queue.put("Task finished")
Run Code Online (Sandbox Code Playgroud)

  • 是的,使用 `self.master.after(100, self.process_queue)` 你每 100 毫秒安排一次这个方法,而 `self.process_queue()` 在每次调用之间没有任何延迟地执行它。没有必要这样做,所以 `after` 是一个更好的解决方案来定期检查内容。 (2认同)

Buv*_*inJ 6

我将提交替代解决方案的基础。它本身并不特定于 Tk 进度条,但当然可以很容易地实现它。

这里有一些类允许您在 Tk 的后台运行其他任务,在需要时更新 Tk 控件,而不是锁定 gui!

这是 TkRepeatingTask 和 BackgroundTask 类:

import threading

class TkRepeatingTask():

    def __init__( self, tkRoot, taskFuncPointer, freqencyMillis ):
        self.__tk_   = tkRoot
        self.__func_ = taskFuncPointer        
        self.__freq_ = freqencyMillis
        self.__isRunning_ = False

    def isRunning( self ) : return self.__isRunning_ 

    def start( self ) : 
        self.__isRunning_ = True
        self.__onTimer()

    def stop( self ) : self.__isRunning_ = False

    def __onTimer( self ): 
        if self.__isRunning_ :
            self.__func_() 
            self.__tk_.after( self.__freq_, self.__onTimer )

class BackgroundTask():

    def __init__( self, taskFuncPointer ):
        self.__taskFuncPointer_ = taskFuncPointer
        self.__workerThread_ = None
        self.__isRunning_ = False

    def taskFuncPointer( self ) : return self.__taskFuncPointer_

    def isRunning( self ) : 
        return self.__isRunning_ and self.__workerThread_.isAlive()

    def start( self ): 
        if not self.__isRunning_ :
            self.__isRunning_ = True
            self.__workerThread_ = self.WorkerThread( self )
            self.__workerThread_.start()

    def stop( self ) : self.__isRunning_ = False

    class WorkerThread( threading.Thread ):
        def __init__( self, bgTask ):      
            threading.Thread.__init__( self )
            self.__bgTask_ = bgTask

        def run( self ):
            try :
                self.__bgTask_.taskFuncPointer()( self.__bgTask_.isRunning )
            except Exception as e: print repr(e)
            self.__bgTask_.stop()
Run Code Online (Sandbox Code Playgroud)

这是一个 Tk 测试,它演示了这些的使用。如果您想查看实际演示,只需将其附加到包含这些类的模块底部:

def tkThreadingTest():

    from tkinter import Tk, Label, Button, StringVar
    from time import sleep

    class UnitTestGUI:

        def __init__( self, master ):
            self.master = master
            master.title( "Threading Test" )

            self.testButton = Button( 
                self.master, text="Blocking", command=self.myLongProcess )
            self.testButton.pack()

            self.threadedButton = Button( 
                self.master, text="Threaded", command=self.onThreadedClicked )
            self.threadedButton.pack()

            self.cancelButton = Button( 
                self.master, text="Stop", command=self.onStopClicked )
            self.cancelButton.pack()

            self.statusLabelVar = StringVar()
            self.statusLabel = Label( master, textvariable=self.statusLabelVar )
            self.statusLabel.pack()

            self.clickMeButton = Button( 
                self.master, text="Click Me", command=self.onClickMeClicked )
            self.clickMeButton.pack()

            self.clickCountLabelVar = StringVar()            
            self.clickCountLabel = Label( master,  textvariable=self.clickCountLabelVar )
            self.clickCountLabel.pack()

            self.threadedButton = Button( 
                self.master, text="Timer", command=self.onTimerClicked )
            self.threadedButton.pack()

            self.timerCountLabelVar = StringVar()            
            self.timerCountLabel = Label( master,  textvariable=self.timerCountLabelVar )
            self.timerCountLabel.pack()

            self.timerCounter_=0

            self.clickCounter_=0

            self.bgTask = BackgroundTask( self.myLongProcess )

            self.timer = TkRepeatingTask( self.master, self.onTimer, 1 )

        def close( self ) :
            print "close"
            try: self.bgTask.stop()
            except: pass
            try: self.timer.stop()
            except: pass            
            self.master.quit()

        def onThreadedClicked( self ):
            print "onThreadedClicked"
            try: self.bgTask.start()
            except: pass

        def onTimerClicked( self ) :
            print "onTimerClicked"
            self.timer.start()

        def onStopClicked( self ) :
            print "onStopClicked"
            try: self.bgTask.stop()
            except: pass
            try: self.timer.stop()
            except: pass                        

        def onClickMeClicked( self ):
            print "onClickMeClicked"
            self.clickCounter_+=1
            self.clickCountLabelVar.set( str(self.clickCounter_) )

        def onTimer( self ) :
            print "onTimer"
            self.timerCounter_+=1
            self.timerCountLabelVar.set( str(self.timerCounter_) )

        def myLongProcess( self, isRunningFunc=None ) :
            print "starting myLongProcess"
            for i in range( 1, 10 ):
                try:
                    if not isRunningFunc() :
                        self.onMyLongProcessUpdate( "Stopped!" )
                        return
                except : pass   
                self.onMyLongProcessUpdate( i )
                sleep( 1.5 ) # simulate doing work
            self.onMyLongProcessUpdate( "Done!" )                

        def onMyLongProcessUpdate( self, status ) :
            print "Process Update: %s" % (status,)
            self.statusLabelVar.set( str(status) )

    root = Tk()    
    gui = UnitTestGUI( root )
    root.protocol( "WM_DELETE_WINDOW", gui.close )
    root.mainloop()

if __name__ == "__main__": 
    tkThreadingTest()
Run Code Online (Sandbox Code Playgroud)

关于BackgroundTask,我要强调两个要点:

1)您在后台任务中运行的函数需要采用一个函数指针,它将调用和尊重,这允许在中途取消任务 - 如果可能的话。

2) 您需要确保退出应用程序时后台任务已停止。如果您不解决这个问题,即使您的 gui 已关闭,该线程仍将运行!

  • 我对我说的话真的很有信心。Tkinter 不是线程安全的并不意味着您不能在多线程应用程序中使用它。只是您必须将同时访问 Tkinter 的线程数限制为 1(这通常由主线程决定)。[我的回答](http://stackoverflow.com/a/38555487/355230) 对另一个 Tkinter 问题有一个正在完成的示例。 (3认同)

ale*_*ksk 6

我使用了 RxPY,它有一些很好的线程函数,可以以相当干净的方式解决这个问题。没有队列,并且我提供了一个在后台线程完成后在主线程上运行的函数。这是一个工作示例:

import rx
from rx.scheduler import ThreadPoolScheduler
import time
import tkinter as tk

class UI:
   def __init__(self):
      self.root = tk.Tk()
      self.pool_scheduler = ThreadPoolScheduler(1) # thread pool with 1 worker thread
      self.button = tk.Button(text="Do Task", command=self.do_task).pack()

   def do_task(self):
      rx.empty().subscribe(
         on_completed=self.long_running_task, 
         scheduler=self.pool_scheduler
      )

   def long_running_task(self):
      # your long running task here... eg:
      time.sleep(3)
      # if you want a callback on the main thread:
      self.root.after(5, self.on_task_complete)

   def on_task_complete(self):
       pass # runs on main thread

if __name__ == "__main__":
    ui = UI()
    ui.root.mainloop()
Run Code Online (Sandbox Code Playgroud)

使用此构造的另一种方法可能更干净(取决于偏好):

tk.Button(text="Do Task", command=self.button_clicked).pack()

...

def button_clicked(self):

   def do_task(_):
      time.sleep(3) # runs on background thread
             
   def on_task_done():
      pass # runs on main thread

   rx.just(1).subscribe(
      on_next=do_task, 
      on_completed=lambda: self.root.after(5, on_task_done), 
      scheduler=self.pool_scheduler
   )
Run Code Online (Sandbox Code Playgroud)


jmi*_*cza 5

问题是 t.join() 阻塞了点击事件,主线程没有回到事件循环来处理重绘。请参阅为什么在 Tkinter 中的进程后出现 ttk Progressbar发送电子邮件时 TTK 进度条被阻止