Cha*_*sen 6 multithreading clojure
我在两个需要处理的非常大的数据集上有两个不同的函数,最后归结为两个布尔值.然后需要将这些值合并为最终结果.我的问题是创建线程的最佳方法是什么,这样两个长函数可以同时运行.我的想法是这样的,
(def f (future longProcessOne(data_one)))
(def g (future longProcessTwo(data_two)))
(and @f @g)
Run Code Online (Sandbox Code Playgroud)
但我一直在寻找更好的方法来进行这方面的投入.
你的方法是相当正常的Clojure代码.另一个选择是使用promises,或者如果你需要更复杂的处理,你可以考虑使用像lamina这样的东西,或者如果你想生活在最前沿,你可以尝试core.async:
(ns async-example.core
(:require [clojure.core.async :refer :all])
(defn example []
(let [a (chan) ; a channel for a to report it's answer
b (chan) ; a channel for b to report it's answer
output (chan)] ; a channel for the reporter to report back to the repl
(go (<! (timeout (rand-int 1000))) ; process a
(>! a (rand-nth [true false])))
(go (<! (timeout (rand-int 1000))) ; process b
(>! b (rand-nth [true false])))
(go (>! output (and (<! a) (<! b)))) ; the reporter process
output)) ;return the channe that the result will be sent to
async-example.core> (<!! (go (<! (example))))
false
async-example.core> (<!! (go (<! (example))))
false
async-example.core> (<!! (go (<! (example))))
true
Run Code Online (Sandbox Code Playgroud)
当然这对你的情况来说太过分了,不管怎样它都非常有趣;-)
(基于 Promise 的方法位于顶部,基于 core.async 的方法位于下方。两者都在第一个 false 值时短路。)
这是一个利用单个 Promise 可以多次交付这一事实的版本(尽管只有第一次交付会成功设置其值;后续交付只是简单地返回,nil没有副作用)。
(defn thread-and
"Computes logical conjunction of return values of fs, each of which
is called in a future. Short-circuits (cancelling the remaining
futures) on first falsey value."
[& fs]
(let [done (promise)
ret (atom true)
fps (promise)]
(deliver fps (doall (for [f fs]
(let [p (promise)]
[(future
(if-not (swap! ret #(and %1 %2) (f))
(deliver done true))
(locking fps
(deliver p true)
(when (every? realized? (map peek @fps))
(deliver done true))))
p]))))
@done
(doseq [[fut] @fps]
(future-cancel fut))
@ret))
Run Code Online (Sandbox Code Playgroud)
一些测试:
(thread-and (constantly true) (constantly true))
;;= true
(thread-and (constantly true) (constantly false))
;;= false
(every? false?
(repeatedly 100000
#(thread-and (constantly true) (constantly false))))
;;= true
;; prints :foo, but not :bar
(thread-and #(do (Thread/sleep 1000) (println :foo))
#(do (Thread/sleep 3000) (println :bar)))
Run Code Online (Sandbox Code Playgroud)
将 Arthur 和 A. Webb 的想法放在一起,您可以使用 core.async 将结果放在一起,同时对返回的第一个 false 值进行短路:
(defn thread-and
"Call each of the fs on a separate thread. Return logical
conjunction of the results. Short-circuit (and cancel the calls
to remaining fs) on first falsey value returned."
[& fs]
(let [futs-and-cs
(doall (for [f fs]
(let [c (chan)]
[(future (>!! c (f))) c])))]
(loop [futs-and-cs futs-and-cs]
(if (seq futs-and-cs)
(let [[result c] (alts!! (map peek futs-and-cs))]
(if result
(recur (remove #(identical? (peek %) c)
futs-and-cs))
(do (doseq [fut (map first futs-and-cs)]
(future-cancel fut))
false)))
true))))
Run Code Online (Sandbox Code Playgroud)
(constantly false)使用和进行测试(constantly true):
(thread-and (constantly true) (constantly true))
;= true
(thread-and (constantly true) (constantly false))
;= false
;;; etc.
Run Code Online (Sandbox Code Playgroud)
另请注意,短路确实有效:
;;; prints :foo before returning false
(thread-and #(do (Thread/sleep 3000) false)
#(do (Thread/sleep 1000) (println :foo)))
;;; does not print :foo
(thread-and #(do (Thread/sleep 3000) false)
#(do (Thread/sleep 7000) (println :foo)))
Run Code Online (Sandbox Code Playgroud)