在Clojure中限制core.async通道

mik*_*era 16 messaging channel clojure core.async

我正在使用带有core.async的Clojure,并且我希望对通过通道处理的消息数量设置速率限制.

特别是我想:

  • 定义速率限制,例如每秒1,000条消息
  • 只要消息数小于速率限制,就可以(并且及时)处理消息
  • 如果超过速率限制,请对事件进行某种明智的替代处理(例如,告诉客户稍后再试)
  • 具有相当低的开销

实现这一目标的最佳方法是什么?

Eri*_*and 16

问题分解:

  1. 定义速率限制,例如每秒1,000条消息
  2. 只要消息数小于速率限制,就可以(并且及时)处理消息
  3. 如果超过速率限制,请对事件进行某种明智的替代处理(例如,告诉客户稍后再试)
  4. 具有相当低的开销

我正在通过一个简单地在循环中组合通道的解决方案来解决问题.

常见的速率限制算法称为令牌桶.您有一个固定大小的令牌桶,您可以固定的速率添加令牌.只要您有令牌,就可以发送消息.

桶的大小决定了"突发性"(你能达到最大速率的速度),并且速率决定了最大平均速率.这些将是我们代码的参数.

让我们创建一个以给定速率发送消息(无关紧要)的频道.(#1)

(defn rate-chan [burstiness rate]
  (let [c (chan burstiness) ;; bucket size is buffer size
        delta (/ 1000 rate)]
    (go
      (while true
        (>! c :go) ;; send a token, will block if bucket is full
        (<! (timeout delta)))) ;; wait a little
    c))
Run Code Online (Sandbox Code Playgroud)

现在我们想要一个通过速率限制另一个频道的频道.(#2)

(defn limit-chan [in rc]
  (let [c (chan)]
    (go 
      (while true
        (<! rc) ;; wait for token
        (>! c (<! in)))) ;; pass message along
    c))
Run Code Online (Sandbox Code Playgroud)

现在,如果没有消息等待,我们可以使用默认的这些通道:

(defn chan-with-default [in]
  (let [c (chan)]
    (go
      (while true
        ;; take from in, or if not available, pass useful message
        (>! c (alts! [in] :default :rate-exceeded))))
    c))
Run Code Online (Sandbox Code Playgroud)

现在我们有了解决问题的所有方面.

(def rchan (-> (chan)
               (limit-chan (rate-chan 100 1000))
               (chan-with-default)))
Run Code Online (Sandbox Code Playgroud)

就#4而言,这不是绝对最快的解决方案.但它是一个使用可组合部件并且可能足够快的部件.如果你想要它更快,你可以做一个循环来完成所有这些(而不是将它分解成更小的函数).最快的是自己实现接口.


bru*_*nov 9

我写了一个小库来解决这个问题.它的实现与Eric Normand相似,但是对于高吞吐量通道采取了一些措施(对于接近毫秒的睡眠时间,超时并不精确).

它还支持全局限制一组通道,并支持功能限制.

检查它在这里.

  • Throttler是一个很好的图书馆!谢谢! (2认同)

opt*_*evo 6

这是使用atom计算发送消息数量并定期将其重置为零的一种方法:

(def counter (atom 0))

(def time-period 1000) ;milliseconds

(def max-rate 1000) ;max number of messages per time-period

(def ch (chan))

(defn alert-client []
  (println "That's enough!"))

(go (while true (<! (timeout time-period)) (reset! counter 0))) ; reset counter periodically 

(defn process [msg]
  (if (> (swap! counter inc) max-rate) (alert-client) (put! ch msg)))

(doseq [x (range 1001)] (process x)) ; throw some messages at the channel
Run Code Online (Sandbox Code Playgroud)

您需要更多代码来使用来自频道的消息.如果您不确定是否能够以限制它们的速率持续使用消息,则可能需要指定通道缓冲区大小或通道类型(删除/滑动).