mik*_*era 16 messaging channel clojure core.async
我正在使用带有core.async的Clojure,并且我希望对通过通道处理的消息数量设置速率限制.
特别是我想:
实现这一目标的最佳方法是什么?
Eri*_*and 16
问题分解:
我正在通过一个简单地在循环中组合通道的解决方案来解决问题.
常见的速率限制算法称为令牌桶.您有一个固定大小的令牌桶,您可以固定的速率添加令牌.只要您有令牌,就可以发送消息.
桶的大小决定了"突发性"(你能达到最大速率的速度),并且速率决定了最大平均速率.这些将是我们代码的参数.
让我们创建一个以给定速率发送消息(无关紧要)的频道.(#1)
(defn rate-chan [burstiness rate]
(let [c (chan burstiness) ;; bucket size is buffer size
delta (/ 1000 rate)]
(go
(while true
(>! c :go) ;; send a token, will block if bucket is full
(<! (timeout delta)))) ;; wait a little
c))
Run Code Online (Sandbox Code Playgroud)
现在我们想要一个通过速率限制另一个频道的频道.(#2)
(defn limit-chan [in rc]
(let [c (chan)]
(go
(while true
(<! rc) ;; wait for token
(>! c (<! in)))) ;; pass message along
c))
Run Code Online (Sandbox Code Playgroud)
现在,如果没有消息等待,我们可以使用默认的这些通道:
(defn chan-with-default [in]
(let [c (chan)]
(go
(while true
;; take from in, or if not available, pass useful message
(>! c (alts! [in] :default :rate-exceeded))))
c))
Run Code Online (Sandbox Code Playgroud)
现在我们有了解决问题的所有方面.
(def rchan (-> (chan)
(limit-chan (rate-chan 100 1000))
(chan-with-default)))
Run Code Online (Sandbox Code Playgroud)
就#4而言,这不是绝对最快的解决方案.但它是一个使用可组合部件并且可能足够快的部件.如果你想要它更快,你可以做一个循环来完成所有这些(而不是将它分解成更小的函数).最快的是自己实现接口.
这是使用atom计算发送消息数量并定期将其重置为零的一种方法:
(def counter (atom 0))
(def time-period 1000) ;milliseconds
(def max-rate 1000) ;max number of messages per time-period
(def ch (chan))
(defn alert-client []
(println "That's enough!"))
(go (while true (<! (timeout time-period)) (reset! counter 0))) ; reset counter periodically
(defn process [msg]
(if (> (swap! counter inc) max-rate) (alert-client) (put! ch msg)))
(doseq [x (range 1001)] (process x)) ; throw some messages at the channel
Run Code Online (Sandbox Code Playgroud)
您需要更多代码来使用来自频道的消息.如果您不确定是否能够以限制它们的速率持续使用消息,则可能需要指定通道缓冲区大小或通道类型(删除/滑动).