eri*_*kcw 26 queue concurrency clojure
我正在尝试找出使用代理从消息队列(Amazon SQS)中使用项目的最佳方法.现在我有一个函数(process-queue-item),它从队列中获取项目并对其进行处理.
我想同时处理这些项目,但我无法理解如何控制代理.基本上我想尽可能地让所有代理忙,而不是从队列中抽取很多项目并开发积压(我会在几台机器上运行它,所以项目需要留在队列中直到它们真的需要).
谁能给我一些关于改进我的实现的指示?
(def active-agents (ref 0))
(defn process-queue-item [_]
(dosync (alter active-agents inc))
;retrieve item from Message Queue (Amazon SQS) and process
(dosync (alter active-agents dec)))
(defn -main []
(def agents (for [x (range 20)] (agent x)))
(loop [loop-count 0]
(if (< @active-agents 20)
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent))
;should skip this agent until later if it is still busy processing (not sure how)
(send-off agent process-queue-item)))
;(apply await-for (* 10 1000) agents)
(Thread/sleep 10000)
(logging/info (str "ACTIVE AGENTS " @active-agents))
(if (> 10 loop-count)
(do (logging/info (str "done, let's cleanup " count))
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent)))
(apply await agents)
(shutdown-agents))
(recur (inc count)))))
Run Code Online (Sandbox Code Playgroud)
cgr*_*and 23
(let [switch (atom true) ; a switch to stop workers
workers (doall
(repeatedly 20 ; 20 workers pulling and processing items from SQS
#(future (while @switch
(retrieve item from Amazon SQS and process)))))]
(Thread/sleep 100000) ; arbitrary rule to decide when to stop ;-)
(reset! switch false) ; stop !
(doseq [worker workers] @worker)) ; waiting for all workers to be done
Run Code Online (Sandbox Code Playgroud)
您要求的是一种不断分发任务的方法,但有一些上限.一种简单的方法是使用信号量来协调限制.以下是我将如何处理它:
(let [limit (.availableProcessors (Runtime/getRuntime))
; note: you might choose limit 20 based upon your problem description
sem (java.util.concurrent.Semaphore. limit)]
(defn submit-future-call
"Takes a function of no args and yields a future object that will
invoke the function in another thread, and will cache the result and
return it on all subsequent calls to deref/@. If the computation has
not yet finished, calls to deref/@ will block.
If n futures have already been submitted, then submit-future blocks
until the completion of another future, where n is the number of
available processors."
[#^Callable task]
; take a slot (or block until a slot is free)
(.acquire sem)
(try
; create a future that will free a slot on completion
(future (try (task) (finally (.release sem))))
(catch java.util.concurrent.RejectedExecutionException e
; no task was actually submitted
(.release sem)
(throw e)))))
(defmacro submit-future
"Takes a body of expressions and yields a future object that will
invoke the body in another thread, and will cache the result and
return it on all subsequent calls to deref/@. If the computation has
not yet finished, calls to deref/@ will block.
If n futures have already been submitted, then submit-future blocks
until the completion of another future, where n is the number of
available processors."
[& body] `(submit-future-call (fn [] ~@body)))
#_(example
user=> (submit-future (reduce + (range 100000000)))
#<core$future_call$reify__5782@6c69d02b: :pending>
user=> (submit-future (reduce + (range 100000000)))
#<core$future_call$reify__5782@38827968: :pending>
user=> (submit-future (reduce + (range 100000000)))
;; blocks at this point for a 2 processor PC until the previous
;; two futures complete
#<core$future_call$reify__5782@214c4ac9: :pending>
;; then submits the job
Run Code Online (Sandbox Code Playgroud)
现在,只需要协调任务本身的处理方式.听起来你已经有了这样做的机制.循环(submit-future(process-queue-item))