Clojure代理从队列中消耗

eri*_*kcw 26 queue concurrency clojure

我正在尝试找出使用代理从消息队列(Amazon SQS)中使用项目的最佳方法.现在我有一个函数(process-queue-item),它从队列中获取项目并对其进行处理.

我想同时处理这些项目,但我无法理解如何控制代理.基本上我想尽可能地让所有代理忙,而不是从队列中抽取很多项目并开发积压(我会在几台机器上运行它,所以项目需要留在队列中直到它们真的需要).

谁能给我一些关于改进我的实现的指示?

(def active-agents (ref 0))

(defn process-queue-item [_]
  (dosync (alter active-agents inc))
  ;retrieve item from Message Queue (Amazon SQS) and process
  (dosync (alter active-agents dec)))

(defn -main []
  (def agents (for [x (range 20)] (agent x)))

  (loop [loop-count 0]

    (if (< @active-agents 20)
      (doseq [agent agents]
        (if (agent-errors agent)
          (clear-agent-errors agent))
        ;should skip this agent until later if it is still busy processing (not sure how)
        (send-off agent process-queue-item)))

    ;(apply await-for (* 10 1000) agents)
    (Thread/sleep  10000)
    (logging/info (str "ACTIVE AGENTS " @active-agents))
    (if (> 10 loop-count)
      (do (logging/info (str "done, let's cleanup " count))
       (doseq [agent agents]
         (if (agent-errors agent)
           (clear-agent-errors agent)))
       (apply await agents)
       (shutdown-agents))
      (recur (inc count)))))
Run Code Online (Sandbox Code Playgroud)

cgr*_*and 23

(let [switch (atom true) ; a switch to stop workers
      workers (doall 
                (repeatedly 20 ; 20 workers pulling and processing items from SQS
                  #(future (while @switch 
                             (retrieve item from Amazon SQS and process)))))]
  (Thread/sleep 100000) ; arbitrary rule to decide when to stop ;-)
  (reset! switch false) ; stop !
  (doseq [worker workers] @worker)) ; waiting for all workers to be done
Run Code Online (Sandbox Code Playgroud)

  • @AlexB好抓,它甚至不是1.4问题:#应该在那里.我修复了代码,谢谢! (3认同)
  • 这不再适用于1.4(`future`和`future-call`不返回`IFn`,``反复`需要).但是,你可以轻松地将未来包装在一个函数中,但是可以通过在`(future` with```之前加上. (2认同)

Tim*_*ley 6

您要求的是一种不断分发任务的方法,但有一些上限.一种简单的方法是使用信号量来协调限制.以下是我将如何处理它:

(let [limit (.availableProcessors (Runtime/getRuntime))
      ; note: you might choose limit 20 based upon your problem description
      sem (java.util.concurrent.Semaphore. limit)]
  (defn submit-future-call
    "Takes a function of no args and yields a future object that will
    invoke the function in another thread, and will cache the result and
    return it on all subsequent calls to deref/@. If the computation has
    not yet finished, calls to deref/@ will block. 
    If n futures have already been submitted, then submit-future blocks
    until the completion of another future, where n is the number of
    available processors."  
    [#^Callable task]
    ; take a slot (or block until a slot is free)
    (.acquire sem)
    (try
      ; create a future that will free a slot on completion
      (future (try (task) (finally (.release sem))))
      (catch java.util.concurrent.RejectedExecutionException e
        ; no task was actually submitted
        (.release sem)
        (throw e)))))

(defmacro submit-future
  "Takes a body of expressions and yields a future object that will
  invoke the body in another thread, and will cache the result and
  return it on all subsequent calls to deref/@. If the computation has
  not yet finished, calls to deref/@ will block.
  If n futures have already been submitted, then submit-future blocks
  until the completion of another future, where n is the number of
  available processors."  
  [& body] `(submit-future-call (fn [] ~@body)))

#_(example
    user=> (submit-future (reduce + (range 100000000)))
    #<core$future_call$reify__5782@6c69d02b: :pending>
    user=> (submit-future (reduce + (range 100000000)))
    #<core$future_call$reify__5782@38827968: :pending>
    user=> (submit-future (reduce + (range 100000000)))
    ;; blocks at this point for a 2 processor PC until the previous
    ;; two futures complete
    #<core$future_call$reify__5782@214c4ac9: :pending>
    ;; then submits the job
Run Code Online (Sandbox Code Playgroud)

现在,只需要协调任务本身的处理方式.听起来你已经有了这样做的机制.循环(submit-future(process-queue-item))