Clojure中Web服务的异步作业队列

Joe*_*per 7 queue jobs web-services clojure lamina-clojure

目前我正在尝试使用RESTful API构建一个Web服务来处理一些长时间运行的任务(作业).

这个想法是用户通过执行POST来提交作业,该POST返回一些用于检查作业状态的URL,该URL还包含结果的URL.一旦作业完成(即某些值写入数据库),结果URL将返回相应的信息(而不是没有结果),并且作业URL将指示已完成的状态.

不幸的是,计算非常密集,因此一次只能运行一个,因此需要对作业进行排队.

在伪事情中需要这样的东西

(def job-queue (atom queue)) ;; some queue 
(def jobs (atom {}))

(defn schedule-job [params] 
  ;; schedules the job into the queue and 
  ;; adds the job to a jobs map for checking status via GET
  ;; note that the job should not  be evaluated until popped from the queue
)

(POST "/analyze" [{params :params}] 
 (schedulde-job params))

(GET "job/:id" [:d] 
 (get @jobs id))

;; Some function that pops the next item from the queue 
;; and evaluates it when the previous item is complete
;; Note: should not terminate when queue is empty! 
Run Code Online (Sandbox Code Playgroud)

我看过Lamina允许异步处理,但它似乎不适合我的需要.

我的问题是如何在前一个队列完成后将队列出队并执行其任务,而不会在队列为空时终止,即永久处理传入的作业.

小智 9

java.util.concurrent.ExecutorService可能就是你想要的.这允许您提交作业以供以后执行,并返回一个可以查询的Future,以发现它是否已完成.

(import '[java.util.concurrent Callable Executors])

(def job-executor
  (Executors/newSingleThreadExecutor))

(def jobs (atom {}))

(defn submit-job [func]
  (let [job-id   (str (java.util.UUID/randomUUID))
        callable (reify Callable (call [_] (func))]
    (swap! jobs assoc job-id (.submit job-executor callable))
    job-id))

(use 'compojure.core)

(defroutes app
  (POST "/jobs" [& params]
    (let [id (submit-job #(analyze params))]
      {:status 201 :headers {"Location" (str "/jobs/" id)}}))
  (GET "/jobs/:id" [id]
    (let [job-future (@jobs id)]
      (if (.isDone job-future)
        (.get job-future)
        {:status 404}))))
Run Code Online (Sandbox Code Playgroud)

  • `ExecutorService`不能替换为内置的`future`? (2认同)