Clojure中的工作队列

bde*_*ham 10 queue clojure

我正在使用Clojure应用程序从Web API访问数据.我将要发出大量请求,并且许多请求将导致更多请求被发送,因此我希望将请求URL保留在后续下载间隔60秒的队列中.

这篇博文后,我把它放在一起:

(def queue-delay (* 1000 60)) ; one minute

(defn offer!
  [q x]
  (.offerLast q x)
  q)

(defn take!
  [q]
  (.takeFirst q))

(def my-queue (java.util.concurrent.LinkedBlockingDeque.))

(defn- process-queue-item
  [item]
  (println ">> " item)   ; this would be replaced by downloading `item`
  (Thread/sleep queue-delay))
Run Code Online (Sandbox Code Playgroud)

如果我(future (process-queue-item (take! my-queue)))在我的代码中包含一个,然后在REPL我可以(offer! my-queue "something"),我看到">>某事"立即打印.到现在为止还挺好!但是我需要队列在我的程序活动的整个时间内持续.(future ...)我刚才提到的调用工作是将一个项目从队列中拉出来,一旦它可用,但是我想要一些能够持续观察队列的东西,并process-queue-item在有空的时候调用.

而且,与通常的Clojure对并发的热爱相反,我想确保一次只发出一个请求,并且我的程序等待60秒来完成每个后续请求.

我认为这个Stack Overflow问题是相关的,但我不确定如何调整它来做我想做的事情.如何连续轮询我的队列并确保一次只运行一个请求?

bde*_*ham 1

我最终推出了自己的小型库,我将其称为simple-queue。您可以在 GitHub 上阅读完整文档,但这里是完整的源代码。我\xe2\x80\x99m 不会更新这个答案,所以如果你\xe2\x80\x99d 喜欢使用这个库,请从 GitHub 获取源代码。

\n\n
(ns com.github.bdesham.simple-queue)\n\n(defn new-queue\n  "Creates a new queue. Each trigger from the timer will cause the function f\n  to be invoked with the next item from the queue. The queue begins processing\n  immediately, which in practice means that the first item to be added to the\n  queue is processed immediately."\n  [f & opts]\n  (let [options (into {:delaytime 1}\n                      (select-keys (apply hash-map opts) [:delaytime])),\n        delaytime (:delaytime options),\n        queue {:queue (java.util.concurrent.LinkedBlockingDeque.)},\n        task (proxy [java.util.TimerTask] []\n               (run []\n                 (let [item (.takeFirst (:queue queue)),\n                       value (:value item),\n                       prom (:promise item)]\n                   (if prom\n                     (deliver prom (f value))\n                     (f value))))),\n        timer (java.util.Timer.)]\n    (.schedule timer task 0 (int (* 1000 delaytime)))\n    (assoc queue :timer timer)))\n\n(defn cancel\n  "Permanently stops execution of the queue. If a task is already executing\n  then it proceeds unharmed."\n  [queue]\n  (.cancel (:timer queue)))\n\n(defn process\n  "Adds an item to the queue, blocking until it has been processed. Returns\n  (f item)."\n  [queue item]\n  (let [prom (promise)]\n    (.offerLast (:queue queue)\n                {:value item,\n                 :promise prom})\n    @prom))\n\n(defn add\n  "Adds an item to the queue and returns immediately. The value of (f item) is\n  discarded, so presumably f has side effects if you\'re using this."\n  [queue item]\n  (.offerLast (:queue queue)\n              {:value item,\n               :promise nil}))\n
Run Code Online (Sandbox Code Playgroud)\n\n

使用此队列返回值的示例:

\n\n
(def url-queue (q/new-queue slurp :delaytime 30))\n(def github (q/process url-queue "https://github.com"))\n(def google (q/process url-queue "http://www.google.com"))\n
Run Code Online (Sandbox Code Playgroud)\n\n

对的调用q/process将会阻塞,因此两个语句之间会有 30 秒的延迟def

\n\n

使用此队列纯粹是为了产生副作用的示例:

\n\n
(defn cache-url\n  [{url :url, filename :filename}]\n  (spit (java.io.File. filename)\n        (slurp url)))\n\n(def url-queue (q/new-queue cache-url :delaytime 30))\n(q/add url-queue {:url "https://github.com",\n                  :filename "github.html"})    ; returns immediately\n(q/add url-queue {:url "https://google.com",\n                  :filename "google.html"})    ; returns immediately\n
Run Code Online (Sandbox Code Playgroud)\n\n

现在来电q/add立即返回。

\n