在Lparallel库中使用队列(通用Lisp)

dav*_*ugh 3 multithreading common-lisp message-queue

lparallel库中队列的基本讨论,网址https://z0ltan.wordpress.com/2016/09/09/basic-concurrency-and-parallelism-in-common-lisp-part-4a-parallelism-using-lparallel- basics /#channels说,该队列“启用在工作线程之间传递消息”。下面的测试使用一个共享队列来协调主线程和从属线程,其中主线程在退出之前只是等待下一个线程的完成:

(defun foo (q)
  (sleep 1)
  (lparallel.queue:pop-queue q))  ;q is now empty

(defun test ()
  (setf lparallel:*kernel* (lparallel:make-kernel 1))
  (let ((c (lparallel:make-channel))
        (q (lparallel.queue:make-queue)))
    (lparallel.queue:push-queue 0 q)
    (lparallel:submit-task c #'foo q)
    (loop do (sleep .2)
             (print (lparallel.queue:peek-queue q))
          when (lparallel.queue:queue-empty-p q)
            do (return)))
  (lparallel:end-kernel :wait t))
Run Code Online (Sandbox Code Playgroud)

这可以按预期的方式产生输出:

* (test)

0
0
0
0
NIL
(#<SB-THREAD:THREAD "lparallel" FINISHED values: NIL {10068F2B03}>)
Run Code Online (Sandbox Code Playgroud)

我的问题是我是否正确或完全使用了lparallel的队列功能。似乎队列只是使用全局变量来保存线程共享对象的替代品。使用队列的设计优势是什么?通常为每个提交的任务分配一个队列(假设任务需要进行通信)是一种好习惯吗?感谢您提供更深入的见解。

cor*_*ump 5

多线程工作是通过管理对可变共享状态的并发访问来完成的,即,您拥有对通用数据结构的锁定,并且每个线程都对其进行读写。

但是,建议尽量减少同时访问的数据数量。队列是通过使每个线程管理其本地状态并仅通过消息交换数据来使工作人员彼此分离的一种方法。这是线程安全的,因为对队列的访问由锁和条件变量控制

您在主线程中正在做的事情是在队列为空时进行轮询。这可能有效,但这适得其反,因为队列被用作同步机制,但是您在这里自己进行同步。

(ql:quickload :lparallel)
(defpackage :so (:use :cl
                      :lparallel
                      :lparallel.queue
                      :lparallel.kernel-util))
(in-package :so)
Run Code Online (Sandbox Code Playgroud)

让我们进行更改foo,使其得到两个队列,一个队列用于传入的请求,一个队列用于答复。在这里,我们对发送的数据执行简单的转换,对于每条输入消息,仅存在一条输出消息,但是不一定总是这样。

(defun foo (in out)
  (push-queue (1+ (pop-queue in)) out))
Run Code Online (Sandbox Code Playgroud)

进行更改test,以使控制流仅基于对队列的读/写:

(defun test ()
  (with-temp-kernel (1)
    (let ((c (make-channel))
          (foo-in (make-queue))
          (foo-out (make-queue)))
      (submit-task c #'foo foo-in foo-out)
      ;; submit data to task (could be blocking)
      (push-queue 0 foo-in)
      ;; wait for message from task (could be blocking too)
      (pop-queue foo-out))))
Run Code Online (Sandbox Code Playgroud)

但是,如果有多个任务正在运行,如何避免在测试中进行轮询?您是否不需要连续检查其中的任何一个完成,以便您可以将更多工作推入队列?

您可以使用不同的并发机制,类似于listenpoll / epoll,您可以在其中监视多个事件源,并在事件之一准备就绪时做出反应。很自然就可以使用Goselect)和Erlangreceive)之类的语言。在Lisp方面,Calispel库提供了类似的交替机制(pri-altfair-alt)。例如,以下内容取自Calispel的测试代码:

(pri-alt ((? control msg)
          (ecase msg
            (:clean-up (setf cleanup? t))
            (:high-speed (setf slow? nil))
            (:low-speed (setf slow? t))))
         ((? channel msg)
          (declare (type fixnum msg))
          (vector-push-extend msg out))
         ((otherwise :timeout (if cleanup? 0 nil))
          (! reader-results out)
          (! thread-expiration (bt:current-thread))
          (return)))
Run Code Online (Sandbox Code Playgroud)

对于lparallel,没有这种机制,但是只要您使用标识符标记消息,就可以只使用队列。

如果您需要尽快作出反应无论是任务t1还是t2给出了结果,然后使双方在同一个结果的线路的任务写的:

(let ((t1 (foo :id 1 :in i1 :out res))
      (t2 (bar :id 2 :in i2 :out res)))
   (destructuring-bind (id message) (pop-queue res)
     (case id
       (1 ...)
       (2 ...))))
Run Code Online (Sandbox Code Playgroud)

如果您需要同时同步这两个代码t1t2发出结果,请让它们在不同的通道中编写:

(let ((t1 (foo :id 1 :in i1 :out o1))
      (t2 (bar :id 2 :in i2 :out o2)))
   (list (pop-queue o1)
         (pop-queue o2)))
Run Code Online (Sandbox Code Playgroud)

  • 简单的解释!您必须是CS教授。我想我现在会坚持使用lparallel,但是很了解其他人(horcrux问题)很高兴。 (2认同)