具有资格的生产者消费者

tgg*_*guy 21 clojure

我是clojure的新手,我正在尝试理解如何正确使用它的并发功能,所以任何批评/建议都值得赞赏.所以我试图在clojure中编写一个小测试程序,其工作方式如下:

  1. 有5个生产者和2个消费者
  2. 生产者等待随机时间,然后将数字推送到共享队列.
  3. 一旦队列非空,消费者应该从队列中拉出一个数字,然后在短时间内休眠以模拟工作
  4. 消费者应该在队列为空时阻止
  5. 生产者应该阻止队列中有超过4个项目,以防止它增长巨大

以下是我对上述每个步骤的计划:

  1. 生产者和消费者将是不真正关心他们的国家的代理人(只是零值或某事); 我只是使用代理发送一个"消费者"或"生产者"功能来做某个时间.然后共享队列将是(def队列(ref [])).也许这应该是一个原子呢?
  2. 在"producer"代理函数中,简单地(Thread/sleep(rand-int 1000))然后(dosync(alter queue conj(rand-int 100)))推入队列.
  3. 我想让消费者代理使用add-watcher观察队列的变化.虽然不确定这一点,但它会让消费者在任何变化中醒来,即使变化来自消费者拉动某些东西(可能使其变空).也许在观察者功能中检查这一点就足够了.我看到的另一个问题是,如果所有消费者都很忙,那么当生产者向队列添加新内容时会发生什么?观看的事件是否在某个消费者代理上排队或者是否消失了?
  4. 往上看
  5. 我真的不知道该怎么做.我听说clojure的seque可能有用,但我找不到足够的doc如何使用它,我的初始测试似乎不起作用(抱歉我的代码不再有了)

Mic*_*zyk 24

这是我的看法.我特别指出只使用Clojure数据结构来看看它是如何工作的.请注意,从Java工具箱中获取阻塞队列并在此处使用它是完全通常和惯用的.我想,代码很容易适应.更新:我实际上已经适应了它java.util.concurrent.LinkedBlockingQueue,见下文.

clojure.lang.PersistentQueue

打电话(pro-con)开始试运行; 然后查看内容,output看是否发生了任何事情,queue-lengths看看它们是否保持在给定范围内.

更新:为了解释为什么我觉得需要在ensure下面使用(我在IRC上被问到这个问题),这是为了防止写入偏斜(请参阅维基百科关于快照隔离的文章定义).如果我替换@queue(ensure queue),那么两个或更多生产者可以检查队列的长度,发现它小于4,然后在队列上放置其他项目,并可能使队列的总长度超过4,打破约束.同样,两个消费者@queue可以接受相同的项目进行处理,然后从队列中弹出两个项目.ensure防止这些情况发生.

(def go-on? (atom true))
(def queue (ref clojure.lang.PersistentQueue/EMPTY))
(def output (ref ()))
(def queue-lengths (ref ()))
(def *max-queue-length* 4)

(defn overseer
  ([] (overseer 20000))
  ([timeout]
     (Thread/sleep timeout)
     (swap! go-on? not)))

(defn queue-length-watch [_ _ _ new-queue-state]
  (dosync (alter queue-lengths conj (count new-queue-state))))

(add-watch queue :queue-length-watch queue-length-watch)

(defn producer [tag]
  (future
   (while @go-on?
     (if (dosync (let [l (count (ensure queue))]
                   (when (< l *max-queue-length*)
                     (alter queue conj tag)
                     true)))
       (Thread/sleep (rand-int 2000))))))

(defn consumer []
  (future
   (while @go-on?
     (Thread/sleep 100)       ; don't look at the queue too often
     (when-let [item (dosync (let [item (first (ensure queue))]
                               (alter queue pop)
                               item))]
       (Thread/sleep (rand-int 500))         ; do stuff
       (dosync (alter output conj item)))))) ; and let us know

(defn pro-con []
  (reset! go-on? true)
  (dorun (map #(%1 %2)
              (repeat 5 producer)
              (iterate inc 0)))
  (dorun (repeatedly 2 consumer))
  (overseer))
Run Code Online (Sandbox Code Playgroud)

java.util.concurrent.LinkedBlockingQueue中

上面写的一个版本使用LinkedBlockingQueue.请注意代码的大致轮廓基本相同,其中一些细节实际上稍微清晰一些.我queue-lengths从这个版本中移除了,因为LBQ我们处理了这个约束.

(def go-on? (atom true))
(def *max-queue-length* 4)
(def queue (java.util.concurrent.LinkedBlockingQueue. *max-queue-length*))
(def output (ref ()))

(defn overseer
  ([] (overseer 20000))
  ([timeout]
     (Thread/sleep timeout)
     (swap! go-on? not)))

(defn producer [tag]
  (future
   (while @go-on?
     (.put queue tag)
     (Thread/sleep (rand-int 2000)))))

(defn consumer []
  (future
   (while @go-on?
     ;; I'm using .poll on the next line so as not to block
     ;; indefinitely if we're done; note that this has the
     ;; side effect that nulls = nils on the queue will not
     ;; be handled; there's a number of other ways to go about
     ;; this if this is a problem, see docs on LinkedBlockingQueue
     (when-let [item (.poll queue)]
       (Thread/sleep (rand-int 500)) ; do stuff
       (dosync (alter output conj item)))))) ; and let us know

(defn pro-con []
  (reset! go-on? true)
  (dorun (map #(%1 %2)
              (repeat 5 producer)
              (iterate inc 0)))
  (dorun (repeatedly 2 consumer))
  (overseer))
Run Code Online (Sandbox Code Playgroud)

  • 注意:java队列不能正确处理nil值,因此必须用特殊值替换它们.见http://clj-me.cgrand.net/2010/04/02/pipe-dreams-are-not-necessarily-made-of-promises/ (3认同)