Clojurescript:处理具有核心/异步通道的块的请求

var*_*nie 2 asynchronous channel clojure clojurescript

我有以下场景:

有一些服务我用来检索一些数据传递给我输入.

有一些输入参数我需要针对上述服务执行N个请求,收集输出并为每个输出执行一些CPU繁重的任务.

我试图使用核心/异步通道实现这一目标.

这是我的尝试(示意图)哪种有效,但它不表现我想要它.将对如何改进它的任何提示表示感谢.

(defn produce-inputs
  [in-chan inputs]
  (let input-names-seq (map #(:name %) inputs)]
    (doseq [input-name input-names-seq]
      (async/go
        (async/>! in-chan input-name)))))

(defn consume
  [inputs]
  (let [in-chan (async/chan 1)
        out-chan (async/chan 1)]
        (do
          (produce-inputs in-chan inputs)
          (async/go-loop []
                   (let [input-name (async/<! in-chan)]
                     (do
                         (retrieve-resource-from-service input-name 
                                                         ; response handler
                                                         (fn [resp]
                                                           (async/go
                                                             (let [result (:result resp)]
                                                               (async/>! out-chan result)))))
                         (when input-name
                           (recur)))))

     ; read from out-chan and do some heavy work for each entry
     (async/go-loop []
                   (let [result (async/<! out-chan)]
                         (do-some-cpu-heavy-work result))))))

; entry point
(defn run
  [inputs]
  (consume inputs))
Run Code Online (Sandbox Code Playgroud)

有没有办法更新它,以便每次不会有超过五个service(retrieve-resource-from-service)活动请求?

如果我的解释不明确,请提出问题,我会更新.

Ale*_*eph 5

您可以创建另一个通道作为令牌桶,以限制请求的速率.

有关使用令牌桶进行每秒速率限制的示例,请参阅此链接.

要限制同时请求的数量,您可以执行以下操作:

(defn consume [inputs]
  (let [in-chan (async/chan 1)
        out-chan (async/chan 1)
        bucket (async/chan 5)]
    ;; ...
    (dotimes [_ 5] (async/put! bucket :token))
    (async/go-loop []
      (let [input-name (async/<! in-chan)
            token (async/<! bucket)]
        (retrieve-resource-from-service
          input-name 
          ; response handler
          (fn [resp]
            (async/go
              (let [result (:result resp)]
                (async/>! out-chan result)
                (async/>! bucket token)))))
        (when input-name
          (recur))))
    ;; ...
    ))
Run Code Online (Sandbox Code Playgroud)

bucket创建一个新频道,并将五个项目放入其中.在触发请求之前,我们从存储桶中获取令牌,并在请求完成后将其放回.如果bucket频道中没有令牌,我们必须等到其中一个请求完成.

注意:这只是代码的草图,您可能需要更正它.特别是,如果您的retrieve-resource-from-service函数中有任何错误处理程序,则应该在发生错误时放回令牌以避免最终的死锁.