如何使用core.async正确批处理邮件?

jwh*_*ark 7 clojure clojurescript core.async

我想通过计数超时在core.async chan上批量消息(即10ms或10条消息,以先到者为准).Tim Baldridge 有关于批处理的视频,但它在core.async中使用了已弃用的函数,并且不使用传感器.我正在寻找类似以下的东西......

(defn batch [in out max-time max-count]
  ...
 )
Run Code Online (Sandbox Code Playgroud)

Mic*_*zyk 14

传感器不应该真正关注配料功能 - 作为in通道上的接收器,它将看到由该通道上的任何传感器转换的值,并且任何听取的读者out将依次看到由该通道的换能器转换的值.

至于一个实现,下面的函数将从最后一批输出后接收批量的max-count项目in,或者多次到达max-time,并输出它们out,当输入通道关闭时关闭,受输入通道的传感器(如果有的话)和任何听取的听众out也将如上所述应用该频道的换能器:

(defn batch [in out max-time max-count]
  (let [lim-1 (dec max-count)]
    (async/go-loop [buf [] t (async/timeout max-time)]
      (let [[v p] (async/alts! [in t])]
        (cond
          (= p t)
          (do
            (async/>! out buf)
            (recur [] (async/timeout max-time)))

          (nil? v)
          (if (seq buf)
            (async/>! out buf))

          (== (count buf) lim-1)
          (do
            (async/>! out (conj buf v))
            (recur [] (async/timeout max-time)))

          :else
          (recur (conj buf v) t))))))
Run Code Online (Sandbox Code Playgroud)