w8s*_*std 0 concurrency clojure
(defn DoubleFrequency []
(def s (slurp "Example.txt"))
(def m (reduce #(assoc %1 %2 (inc (%1 %2 0)))
{}
(re-seq #".." s)))
(def c (count m))
(doseq [[k x] m]
(println k ":" (/ x c))))
Run Code Online (Sandbox Code Playgroud)
我正在尝试将并发应用于我的程序,并且我想使用 pmap,但我不确定如何将它应用到我当前的代码中。该功能对于单核是正确的,但理想情况下,我想以某种方式用 pmap 替换 reduce 并获得相同的结果。
首先,你试图弥补的函数被称为frequencies:
user> (frequencies [1 2 1 3 1 4 4])
;;=> {1 3, 2 1, 3 1, 4 2}
Run Code Online (Sandbox Code Playgroud)
它确实是单线程的。所以让我们试着让它平行。
最初的方法 withreduce是正确的方向,虽然它也不是并行的,但它可以用来与 clojure 的标准库并发工具(即reducers )并行。
首先,让我们稍微重写您的 reducer 函数,以做同样的事情,但以更惯用的方式(它是可选的,但有利于可读性):
#(assoc %1 %2 (inc (%1 %2 0))) => #(update %1 %2 (fnil inc 0))
然后我们可以使用以下方法进行并行减少fold:
(require '[clojure.core.reducers :as r])
(defn pfreq [data]
(r/fold
(partial merge-with +)
(fn [acc k] (update acc k (fnil inc 0)))
data))
Run Code Online (Sandbox Code Playgroud)
这个想法是它按块分割你的集合(如果它足够长),然后将块的结果与merge-with:
user> (pfreq [1 2 1 3 1 4 1 5 2])
;;=> {1 4, 2 2, 3 1, 4 1, 5 1}
Run Code Online (Sandbox Code Playgroud)
另请注意,该系列应该是“可折叠的”。默认情况下,持久向量和映射是可折叠的,re-seq结果不是,所以你应该先把它转换成 vector: (vec (re-seq #"..x" s)),否则你不会得到任何并行化,回退到 plain reduce。
您显然可以使用 pmap 来解决这个问题,使用相同的策略:拆分 -> 映射 -> 组合:
(defn pfreq2 [chunk-size data]
(->> data
(partition-all chunk-size)
(pmap frequencies)
(apply merge-with +)))
Run Code Online (Sandbox Code Playgroud)
但这并不像reducers管道那样灵活和强大。