如何正确终止正在阻塞的线程(Lparallel Common Lisp)

dav*_*ugh 3 multithreading common-lisp blockingqueue

在Lparallel API中,终止所有线程任务的推荐方法是使用停止内核(lparallel:end-kernel)。但是,当线程正在阻塞时(例如,(pop-queue queue1)等待某个项目出现在队列中),当内核停止时,该线程仍将处于活动状态。在这种情况下(至少在SBCL中),内核关闭偶尔(但并非每次)失败,并显示以下信息:

debugger invoked on a SB-KERNEL:BOUNDING-INDICES-BAD-ERROR in thread
#<THREAD "lparallel" RUNNING {1002F04973}>:
  The bounding indices 1 and NIL are bad for a sequence of length 0.
See also:
  The ANSI Standard, Glossary entry for "bounding index designator"
  The ANSI Standard, writeup for Issue SUBSEQ-OUT-OF-BOUNDS:IS-AN-ERROR

debugger invoked on a SB-SYS:INTERACTIVE-INTERRUPT in thread
#<THREAD "main thread" RUNNING {10012E0613}>:
  Interactive interrupt at #x1001484328.
Run Code Online (Sandbox Code Playgroud)

我假设这与阻塞线程无法正确终止有关。关闭内核之前,应该如何正确终止阻塞线程?(API表示kill-tasks仅应在特殊情况下使用,我认为这种情况不适用于这种“正常”关闭情况。)

cor*_*ump 5

杀死线程的问题在于,当线程可能处于任何未知状态时,线程可能会发生在任何地方。安全终止线程的唯一方法是让其正常关闭,这意味着您希望在正常操作期间,线程有一种方法可以知道它应该停止工作。然后,您可以正确清理资源,关闭数据库,释放外部指针,记录所有内容,...

您正在使用的队列的操作可能会超时,这是一种简单而安全的方法,可确保您避免永远阻塞并正确退出。但这不是唯一的选择(除了下面显示的以外,您还可以使用它们)。

 共享/全局标志

发生超时或收到消息时,请检查一个全局布尔变量(或在所有感兴趣的线程之间共享的布尔变量)。这也是退出的一种简单方法,可以被多个线程读取。但是,这是并发访问,因此您应该使用锁或原子操作(http://www.sbcl.org/manual/#Atomic-Operations),例如,使用defglobal带有fixfix类型的atomic-incf,等等。

 控制讯息

在队列中发送控制数据,并使用它们来确定如何正常关闭,如何沿管道传播信息或如何重新启动。这是安全的(只是传递消息),并且允许您在线程中实现任何类型的控制。

(defpackage :so (:use :cl :bt :lparallel.queue))
(in-package :so)
Run Code Online (Sandbox Code Playgroud)

让我们定义两个服务。

第一个回显其输入:

(defun echo (in out)
  (lambda ()
    (loop
      for value = (pop-queue in)
      do (push-queue value out)
      until (eq value :stop))))
Run Code Online (Sandbox Code Playgroud)

请注意,在给定:stop输入后,预期如何正确完成,以及如何将:stop消息传播到其输出队列。

第二个线程将执行模块化加法,并且在两次请求之间休眠:

(defun modulo-adder (x m in out)
  (lambda ()
    (loop
      for value = (progn (sleep 0.02)
                         (pop-queue in))
      do (push-queue (typecase value
                       (keyword value)
                       (number (mod (+ x value) m)))
                     out)
      until (eq value :stop))))
Run Code Online (Sandbox Code Playgroud)

创建队列:

(defparameter *q1* (make-queue))
(defparameter *q2* (make-queue))
Run Code Online (Sandbox Code Playgroud)

创建线程:

(progn
  (bt:make-thread (echo *q1* *q2*) :name "echo")
  (bt:make-thread (modulo-adder 5 1024 *q2* *q1*) :name "adder"))
Run Code Online (Sandbox Code Playgroud)

两个线程以循环方式相互连接,从而形成无限的附加循环。当前在线程之间没有交换任何值,您可以看到它们以例如slime-list-threads或任何其他实现提供的方式运行。无论如何都会(bt:all-threads)返回一个列表。

slime-list-threads

10 adder                          Running 
11 echo                           Running 
...
Run Code Online (Sandbox Code Playgroud)

添加一个项目,现在线程之间可以无限交换数据:

(push-queue 10 *q1*)
Run Code Online (Sandbox Code Playgroud)

等待,然后同时停止它们:

(push-queue :stop *q1*)
Run Code Online (Sandbox Code Playgroud)

两个线程都正常停止(在线程列表中不再可见)。我们可以检查队列中剩余的内容(结果因一项测试而异):

(list (try-pop-queue *q1*)
      (try-pop-queue *q2*))
(99 NIL)

(list (try-pop-queue *q1*)
      (try-pop-queue *q2*))
(:STOP NIL)

(list (try-pop-queue *q1*)
      (try-pop-queue *q2*))
(NIL NIL)
Run Code Online (Sandbox Code Playgroud)

中断线程

您创建了一个由消息或全局标志控制的服务,但是您遇到了一个错误,线程挂起。除了要杀死它并失去一切之外,您至少希望适当地释放线程堆栈。这也很危险,但是您可以在当前正在运行的任何地方bt:interrupt停止线程并执行函数。

(define-condition stop () ())
(defun signal-stop ()
  (signal 'stop))

(defun endless ()
  (let ((output *standard-output*))
    (lambda ()
      (print "START" output)
      (unwind-protect (handler-case (loop)
                        (stop ()
                          (print "INTERRUPTED" output)))
        (print "STOP" output)))))
Run Code Online (Sandbox Code Playgroud)

启动它:

(bt:make-thread (endless) :name "loop")
Run Code Online (Sandbox Code Playgroud)

打印"START"和循环。然后我们中断它:

(bt:interrupt-thread (find "loop"
                           (bt:all-threads)
                           :test #'string=
                           :key #'bt:thread-name)
                     #'signal-stop)
Run Code Online (Sandbox Code Playgroud)

打印以下内容:

"INTERRUPTED" 
"STOP" 
Run Code Online (Sandbox Code Playgroud)

如果线程被杀死,则不会打印这些消息,但是请注意,鉴于中断的随机性,您仍然可以设法破坏数据。此外,它还可以取消阻止诸如sleep或的调用pop-queue