pol*_*ath 5 concurrency multithreading common-lisp event-loop green-threads
我是编程的新手,也是 Common Lisp 的新手。我正在尝试解决以下问题:
有流源(S1、S2、S3)、两个处理器(P1、P2)(处理器不是指 CPU 处理器,而是处理功能/子系统)和四个流目的地(D1、D2、D3、 D4)。通过流,我的意思是数据连续出现,但可能是间歇性的。所以读/写操作可能会阻塞。来自流的数据需要聚合,然后根据它们是什么,由处理器之一进行处理。每个处理器(从其输入聚合)产生四种子聚合,然后将这些子聚合发送到上述四个目的地之一。
我对这个问题的伪代码是这样的:
三个循环,S1、S2、S3 各一个,用于数据聚合和调度。对于特定来源(例如 S2),伪代码是:
loop-2 : read from S2,
when sufficient data aggregate,
if aggregate is kind-1 send to P1
else send to P2.
Run Code Online (Sandbox Code Playgroud)
两个循环,每个循环一个用于 P1、P2,用于处理和解聚合。对于特定的处理器(比如 P1),伪代码是:
In a loop for P1: take aggregate of kind-1,
process it to produce one or some of four sub-aggregates.
case: sub-aggregate 1 -> serialize and send/write to D1,
sub-aggregate 2 -> serialize and send/write to D2,
sub-aggregate 3 -> serialize and send/write to D3,
sub-aggregate 4 -> serialize and send/write to D4.
Run Code Online (Sandbox Code Playgroud)
在处理器和目的地之间,我想使用另一个通道/队列,但没有提到让我的问题对自己更简单。
据我了解,大约有十个阻塞点。三、如果数据在来源(S1、S2、S3)中不可用;三个时处理器(P1、P2)上的数据不足,四个时目的地(D1、D2、D3、D4)未准备好写入。
所以我的问题是:我如何同时运行所有这些循环(同时运行这个词?)而不会相互阻塞;以及程序中正在完成的其他事情。
之前的工作:我看过这个SO question, pollers and notifiers from cl-async。看着Cliki-Concurrency,我也看到了threading-queue。更不用说cl-flow和green-threads 了。CL-flow 提到它是“Common Lisp 中异步非阻塞并发的库”。有没有办法,我可以使用带有 cl-async 或 cl-flow 的队列库?例如,flow-concurrently似乎是正确的事情。但是它可以与 cl-async 的绿色线程一起使用来运行非终止循环吗?
我的困难:我试图阅读我上面提到的每个链接并尽可能多地理解。但是,说实话,我什至无法弄清楚从哪里/如何开始。谢谢你的帮助。
编辑:2020 年 2 月 28 日。
我试过像这样使用 cl-flow 。初始化流程系统后
(ql:quickload "simple-flow-dispatcher")
(ql:quickload "cl-muth")
(ql:quickload "cl-flow")
(ql:quickload "bt-semaphore")
(use-package :cl-flow :bt-semaphore)
(defvar *output* *standard-output*)
(defun print-error (e)
(format *output* "~A" e))
(defvar *dispatcher* (simple-flow-dispatcher:make-simple-dispatcher
:threads 4
:error-handler #'print-error))
(defun run-flow (flow)
(run *dispatcher* flow))
Run Code Online (Sandbox Code Playgroud)
我使用了以下功能:
(defun hello-world (n)
(sleep n)
(format t (concatenate 'string (gap n) "Loop-" (write-to-string n) "~%")))
(defun gap(n)
(if (= n 0)
""
(concatenate 'string " " (gap (- n 1)))))
(defun loop-hello (n)
(loop
(hello-world n)))
(defun test-flow ()
(run-flow
(flow:concurrently
(flow:asynchronously
(loop-hello 3))
(flow:asynchronously
(loop-hello 2))
(flow:asynchronously
(loop-hello 1)))))
Run Code Online (Sandbox Code Playgroud)
hello-world 是一个简单的函数,我结合了函数 gap 在文本前插入一些空格。结果,我期待三列交错的打印行(并发、异步等),但我只得到了一个(第一个循环问候)列。
我目前的理解如下,请指正:loop和sleep都是同步语句,所以执行会被阻塞。所以我搜索了循环和睡眠的异步版本。我想知道我是否可以使用 cl-async 来做到这一点。
与此同时,我试图简化我的问题。昨天,我也偶然发现了这个问题及其答案。我试图理解它。
此外,由于我使用伪代码发布了我的原始问题,我想知道我希望(比如说)一个库为我提供功能(即使它现在不存在)。
我尝试在 google 中搜索异步代码,大多数结果都显示了 javascript 库。直到现在我只能模糊地掌握两个。Python、Clojure(Java)。编写一个小函数,该函数从 q1 异步读取,处理从 v1 读取到 v2 的值,然后写入 q2(q1 和 q2 是队列)。对我来说,似乎我可以这样写(说):
async def p1(x):
#compute v2 from v1 and return v2
async def keep_doing_p1(q1, q2):
while True:
v1 = await q1.pop()
v2 = p1(v1) # function p1 defined above
await q2.push(v2)
Run Code Online (Sandbox Code Playgroud)
当然,必须有 q1 和 q2 的合适实现。在 Clojure(Java) 中,适当地使用 core.async 库:
(def q1 (async/chan))
(def q2 (async/chan))
(defn p1 [v1]
;;; Need a transducer version
;;; compute v2 from v1 and return)
(async/go
(-->
(async/< q1)
(p1)
(async/q2)))
Run Code Online (Sandbox Code Playgroud)
我上面发布的代码可能是错误的。我是根据我(很可能)肤浅的理解写的。我没有 Python、Clojure(Java) 经验。这些例子也不是为了煽动任何火焰战争。我认为所有语言都可以(是)优秀、强大、方便,并迎合不同的人类程序员。我想在 CL 中解决这个问题。我是一个资深的人(按年龄)但是一个大三的,实际上是一个新手(在编程方面)。我真的很喜欢用 CL 写它。并不是说我已经成为或即将成为一名伟大的程序员。但是用CL我感觉,我的天啊,连我都可以写程序!
不管怎样,所以回到问题。所以我想要一个库来提供这样的语法/功能:
(cc-forever q1 q2 p1
(push q2 (p1 (pop q1))))
;;; many other such cc-forever, operating concurrently
Run Code Online (Sandbox Code Playgroud)
您可能已经猜到了,cc-forever 的意思是 (ConCurrently-run FOREVER)。
所以我正在寻求帮助来制作这样的函数/库。如果我的思考过程仍然很复杂,请提出更好/更简单的替代方案。谢谢。
稍后会再次发布我的尝试。
编辑:2020 年 5 月 27 日
我正在继续探索 cl-async 和 cl-flow。我最近遇到了在 cl-async 之上实现的 with-green-thread 宏。作为一个新手,我面临双重问题:这些库使用不同的语法,并且可能使用它在语义上做不同的事情。目前我对 with-green-thread 和 cl-flow: 反复感到鼓舞。虽然有时代码会抛出错误(可能是我对一些 quicklisp 问题或其他问题的处理不当)。会继续发帖。如果有任何反馈,将有所帮助。
| 归档时间: |
|
| 查看次数: |
367 次 |
| 最近记录: |