clojure ref的奇怪行为

Ser*_*dov 7 concurrency clojure ref stm

我有100个工作人员(代理人)共享一个ref包含任务集合的人员.虽然这个集合有任务,但每个工作者从这个集合中获取一个任务(在dosync块中),打印它,有时将它放回集合中(在dosync块中):

(defn have-tasks?
  [tasks]
  (not (empty? @tasks)))

(defn get-task
  [tasks]
  (dosync
    (let [task (first @tasks)]
      (alter tasks rest)
      task)))

(defn put-task
  [tasks task]
  (dosync (alter tasks conj task))
  nil)

(defn worker
  [& {:keys [tasks]}]
  (agent {:tasks tasks}))

(defn worker-loop
  [{:keys [tasks] :as state}]
  (while (have-tasks? tasks)
    (let [task (get-task tasks)]
      (println "Task: " task)
      (when (< (rand) 0.1)
        (put-task tasks task))))
  state)

(defn create-workers
  [count & options]
  (->> (range 0 count)
       (map (fn [_] (apply worker options)))
       (into [])))

(defn start-workers
  [workers]
  (doseq [worker workers] (send-off worker worker-loop)))

(def tasks (ref (range 1 10000000)))

(def workers (create-workers 100 :tasks tasks))

(start-workers workers)
(apply await workers)
Run Code Online (Sandbox Code Playgroud)

当我运行该代码,通过剂打印的最后一个值是(多次尝试后): , 435445, 4556294, .1322061 3950017但绝不9999999是我所期待的.而且每次收藏真的都是空的.我做错了什么?

编辑:

我尽可能简单地重写了worker-loop:

(defn worker-loop
  [{:keys [tasks] :as state}]
  (loop []
    (when-let [task (get-task tasks)]
      (println "Task: " task)
      (recur)))
  state)
Run Code Online (Sandbox Code Playgroud)

但问题仍然存在.创建一个且仅一个工作程序时,此代码的行为与预期相同.

小智 4

这里的问题与代理无关,也与懒惰几乎没有任何关系。这是原始代码的简化版本,但仍然存在问题:

(defn f [init]
  (let [state (ref init)
        task (fn []
               (loop [last-n nil]
                 (if-let [n (dosync
                              (let [n (first @state)]
                                (alter state rest)
                                n))]
                   (recur n)
                   (locking :out
                     (println "Last seen:" last-n)))))
        workers (->> (range 0 5)
                     (mapv (fn [_] (Thread. task))))]
    (doseq [w workers] (.start w))
    (doseq [w workers] (.join w))))

(defn r []
  (f (range 1 100000)))

(defn i [] (f (->> (iterate inc 1)
                   (take 100000))))

(defn t []
  (f (->> (range 1 100000)
          (take Integer/MAX_VALUE))))
Run Code Online (Sandbox Code Playgroud)

运行此代码表明,it,两者都是惰性的,可靠地工作,而r可靠地则不然。该问题实际上是调用返回的类中的并发错误range。事实上,该错误已记录在此 Clojure 票证中,并从 Clojure 版本 开始已修复1.9.0-alpha11

快速总结一下错误,以防由于某种原因无法访问票证:在对rest的结果进行调用的内部range,有一个很小的机会出现竞争条件:“标志”表示“下一个值已经被计算”是在实际值本身之前设置的,这意味着第二个线程可以将该标志视为 true,即使“下一个值”仍然是nilalter然后调用将修复nilref 上的该值。它已通过交换两条分配线来修复。

如果结果range是在单个线程中强制实现或包装在另一个惰性序列中,则不会出现该错误。