Ser*_*dov 7 concurrency clojure ref stm
我有100个工作人员(代理人)共享一个ref包含任务集合的人员.虽然这个集合有任务,但每个工作者从这个集合中获取一个任务(在dosync块中),打印它,有时将它放回集合中(在dosync块中):
(defn have-tasks?
[tasks]
(not (empty? @tasks)))
(defn get-task
[tasks]
(dosync
(let [task (first @tasks)]
(alter tasks rest)
task)))
(defn put-task
[tasks task]
(dosync (alter tasks conj task))
nil)
(defn worker
[& {:keys [tasks]}]
(agent {:tasks tasks}))
(defn worker-loop
[{:keys [tasks] :as state}]
(while (have-tasks? tasks)
(let [task (get-task tasks)]
(println "Task: " task)
(when (< (rand) 0.1)
(put-task tasks task))))
state)
(defn create-workers
[count & options]
(->> (range 0 count)
(map (fn [_] (apply worker options)))
(into [])))
(defn start-workers
[workers]
(doseq [worker workers] (send-off worker worker-loop)))
(def tasks (ref (range 1 10000000)))
(def workers (create-workers 100 :tasks tasks))
(start-workers workers)
(apply await workers)
Run Code Online (Sandbox Code Playgroud)
当我运行该代码,通过剂打印的最后一个值是(多次尝试后): ,
435445,
4556294,
.1322061
3950017但绝不9999999是我所期待的.而且每次收藏真的都是空的.我做错了什么?
编辑:
我尽可能简单地重写了worker-loop:
(defn worker-loop
[{:keys [tasks] :as state}]
(loop []
(when-let [task (get-task tasks)]
(println "Task: " task)
(recur)))
state)
Run Code Online (Sandbox Code Playgroud)
但问题仍然存在.创建一个且仅一个工作程序时,此代码的行为与预期相同.
小智 4
这里的问题与代理无关,也与懒惰几乎没有任何关系。这是原始代码的简化版本,但仍然存在问题:
(defn f [init]
(let [state (ref init)
task (fn []
(loop [last-n nil]
(if-let [n (dosync
(let [n (first @state)]
(alter state rest)
n))]
(recur n)
(locking :out
(println "Last seen:" last-n)))))
workers (->> (range 0 5)
(mapv (fn [_] (Thread. task))))]
(doseq [w workers] (.start w))
(doseq [w workers] (.join w))))
(defn r []
(f (range 1 100000)))
(defn i [] (f (->> (iterate inc 1)
(take 100000))))
(defn t []
(f (->> (range 1 100000)
(take Integer/MAX_VALUE))))
Run Code Online (Sandbox Code Playgroud)
运行此代码表明,i和t,两者都是惰性的,可靠地工作,而r可靠地则不然。该问题实际上是调用返回的类中的并发错误range。事实上,该错误已记录在此 Clojure 票证中,并从 Clojure 版本 开始已修复1.9.0-alpha11。
快速总结一下错误,以防由于某种原因无法访问票证:在对rest的结果进行调用的内部range,有一个很小的机会出现竞争条件:“标志”表示“下一个值已经被计算”是在实际值本身之前设置的,这意味着第二个线程可以将该标志视为 true,即使“下一个值”仍然是nil。alter然后调用将修复nilref 上的该值。它已通过交换两条分配线来修复。
如果结果range是在单个线程中强制实现或包装在另一个惰性序列中,则不会出现该错误。