如何在 Common Lisp 中同时运行多个阻塞循环。[将 cl-async 与线程池中的队列结合起来]

pol*_*ath 5 concurrency multithreading common-lisp event-loop green-threads

我是编程的新手,也是 Common Lisp 的新手。我正在尝试解决以下问题:

有流源(S1、S2、S3)、两个处理器(P1、P2)(处理器不是指 CPU 处理器,而是处理功能/子系统)和四个流目的地(D1、D2、D3、 D4)。通过流,我的意思是数据连续出现,但可能是间歇性的。所以读/写操作可能会阻塞。来自流的数据需要聚合,然后根据它们是什么,由处理器之一进行处理。每个处理器(从其输入聚合)产生四种子聚合,然后将这些子聚合发送到上述四个目的地之一。

我对这个问题的伪代码是这样的:

三个循环,S1、S2、S3 各一个,用于数据聚合和调度。对于特定来源(例如 S2),伪代码是:

 loop-2 : read from S2,
            when sufficient data aggregate, 
                if aggregate is kind-1 send to P1 
                  else send to P2.
Run Code Online (Sandbox Code Playgroud)

两个循环,每个循环一个用于 P1、P2,用于处理和解聚合。对于特定的处理器(比如 P1),伪代码是:

In a loop for P1:   take aggregate of kind-1,
                       process it to produce one or some of four sub-aggregates.
                          case: sub-aggregate 1 -> serialize and send/write to D1,
                                sub-aggregate 2 -> serialize and send/write to D2,
                                sub-aggregate 3 -> serialize and send/write to D3,
                                sub-aggregate 4 -> serialize and send/write to D4.
Run Code Online (Sandbox Code Playgroud)

在处理器和目的地之间,我想使用另一个通道/队列,但没有提到让我的问题对自己更简单。

据我了解,大约有十个阻塞点。三、如果数据在来源(S1、S2、S3)中不可用;三个时处理器(P1、P2)上的数据不足,四个时目的地(D1、D2、D3、D4)未准备好写入。

所以我的问题是:我如何同时运行所有这些循环(同时运行这个词?)而不会相互阻塞;以及程序中正在完成的其他事情。

之前的工作:我看过这个SO question, pollers and notifiers from cl-async。看着Cliki-Concurrency,我也看到了threading-queue。更不用说cl-flowgreen-threads 了。CL-flow 提到它是“Common Lisp 中异步非阻塞并发的库”。有没有办法,我可以使用带有 cl-async 或 cl-flow 的队列库?例如,flow-concurrently似乎是正确的事情。但是它可以与 cl-async 的绿色线程一起使用来运行非终止循环吗?

我的困难:我试图阅读我上面提到的每个链接并尽可能多地理解。但是,说实话,我什至无法弄清楚从哪里/如何开始。谢谢你的帮助。

编辑:2020 年 2 月 28 日。

我试过像这样使用 cl-flow 。初始化流程系统后


(ql:quickload "simple-flow-dispatcher")
(ql:quickload "cl-muth")
(ql:quickload "cl-flow")
(ql:quickload "bt-semaphore")

(use-package :cl-flow :bt-semaphore)

(defvar *output* *standard-output*)

(defun print-error (e)
  (format *output* "~A" e))

(defvar *dispatcher* (simple-flow-dispatcher:make-simple-dispatcher
                      :threads 4
                      :error-handler #'print-error))

(defun run-flow (flow)
  (run *dispatcher* flow))

Run Code Online (Sandbox Code Playgroud)

我使用了以下功能:


(defun hello-world (n)
  (sleep n)
  (format t (concatenate 'string (gap n) "Loop-" (write-to-string n) "~%")))


(defun gap(n)
  (if (= n 0)
      ""
      (concatenate 'string "         " (gap (- n 1)))))

(defun loop-hello (n)
  (loop
     (hello-world n)))

(defun test-flow ()
  (run-flow 
     (flow:concurrently
         (flow:asynchronously
          (loop-hello 3))
         (flow:asynchronously
          (loop-hello 2))
         (flow:asynchronously
         (loop-hello 1)))))

Run Code Online (Sandbox Code Playgroud)

hello-world 是一个简单的函数,我结合了函数 gap 在文本前插入一些空格。结果,我期待三列交错的打印行(并发、异步等),但我只得到了一个(第一个循环问候)列。

我目前的理解如下,请指正:loop和sleep都是同步语句,所以执行会被阻塞。所以我搜索了循环和睡眠的异步版本。我想知道我是否可以使用 cl-async 来做到这一点。

与此同时,我试图简化我的问题。昨天,我也偶然发现了这个问题及其答案。我试图理解它。

此外,由于我使用伪代码发布了我的原始问题,我想知道我希望(比如说)一个库为我提供功能(即使它现在不存在)。

我尝试在 google 中搜索异步代码,大多数结果都显示了 javascript 库。直到现在我只能模糊地掌握两个。Python、Clojure(Java)。编写一个小函数,该函数从 q1 异步读取,处理从 v1 读取到 v2 的值,然后写入 q2(q1 和 q2 是队列)。对我来说,似乎我可以这样写(说):

async def p1(x):
    #compute v2 from v1 and return v2

async def keep_doing_p1(q1, q2):
    while True:
        v1 = await q1.pop()
        v2 = p1(v1) # function p1 defined above
        await q2.push(v2)

Run Code Online (Sandbox Code Playgroud)

当然,必须有 q1 和 q2 的合适实现。在 Clojure(Java) 中,适当地使用 core.async 库:

(def q1 (async/chan))
(def q2 (async/chan))

(defn p1 [v1]
  ;;; Need a transducer version
  ;;; compute v2 from v1 and return)

(async/go
   (-->
       (async/< q1)
       (p1)
       (async/q2)))

Run Code Online (Sandbox Code Playgroud)

我上面发布的代码可能是错误的。我是根据我(很可能)肤浅的理解写的。我没有 Python、Clojure(Java) 经验。这些例子也不是为了煽动任何火焰战争。我认为所有语言都可以(是)优秀、强大、方便,并迎合不同的人类程序员。我想在 CL 中解决这个问题。我是一个资深的人(按年龄)但是一个大三的,实际上是一个新手(在编程方面)。我真的很喜欢用 CL 写它。并不是说我已经成为或即将成为一名伟大的程序员。但是用CL我感觉,我的天啊,连我都可以写程序!

不管怎样,所以回到问题。所以我想要一个库来提供这样的语法/功能:

(cc-forever q1 q2 p1
  (push q2 (p1 (pop q1))))

;;; many other such cc-forever, operating concurrently

Run Code Online (Sandbox Code Playgroud)

您可能已经猜到了,cc-forever 的意思是 (ConCurrently-run FOREVER)。

所以我正在寻求帮助来制作这样的函数/库。如果我的思考过程仍然很复杂,请提出更好/更简单的替代方案。谢谢。

稍后会再次发布我的尝试。

编辑:2020 年 5 月 27 日

我正在继续探索 cl-async 和 cl-flow。我最近遇到了在 cl-async 之上实现的 with-green-thread 宏。作为一个新手,我面临双重问题:这些库使用不同的语法,并且可能使用它在语义上做不同的事情。目前我对 with-green-thread 和 cl-flow: 反复感到鼓舞。虽然有时代码会抛出错误(可能是我对一些 quicklisp 问题或其他问题的处理不当)。会继续发帖。如果有任何反馈,将有所帮助。