ssc*_*eck 7 clojure transducer core.async
我想创建clojure.core.async另一个只过滤特定消息的频道.因此我找到了一个名为filter <的函数.
=> (def c1 (chan))
=> (def c2 (filter< even? c1))
=> (put! c1 1)
=> (put! c1 2)
=> (<!! c2)
2
但该函数及其朋友被标记为已弃用:
不推荐使用 - 此功能将被删除.改用传感器
有两种使用与传感器渠道,如一些方法chan与xform参数.如何使用传感器从现有通道构建新通道?
我对此做了一些研究,发现了一些有趣的文章(第一和第二),然后得到了一些有用的东西pipeline
(require '[clojure.core.async :as async :refer [chan <!! pipeline put!]])
(def c1 (chan))
(def c2 (chan))
(pipeline 4 c2 (filter even?) c1)
(put! c1 1)
(put! c1 2)
(<!! c2)
;;=> 2
我链接的第二篇文章使得管道函数周围的一些辅助函数更加清晰:
(defn ncpus []
  (.availableProcessors (Runtime/getRuntime)))
(defn parallelism []
  (+ (ncpus) 1))
(defn add-transducer
  [in xf]
  (let [out (chan (buffer 16))]
    (pipeline (parallelism) out xf in)
    out))
然后你可以简单地将频道绑在一起
(def c1 (chan))
(def c2 (add-transducer c1 (filter even?))
要完成答案,您发现自己可以以类似的方式使用管道:
(defn pipe-trans
  [ci xf]
  (let [co (chan 1 xf)]
    (pipe ci co)
    co))
(def c1 (chan))
(def c2 (pipe-trans c1 (filter even?)))