我在原子中保持进程注册表。
我想每个启动一个进程,并且仅启动一个进程(特别是core.async go-loop)id。
但是,您不应该在中执行副作用swap!,因此这段代码不好:
(swap! processes-atom
(fn [processes]
(if (get processes id)
processes ;; already exists, do nothing
(assoc processes id (create-process! id)))))
Run Code Online (Sandbox Code Playgroud)
我将如何正确执行此操作?
我看过了locking,它以一个对象作为锁的监视器。我希望每个id都是动态的,都有自己的锁。
您可以使用 手动执行此操作locking,如 OlegTheCat 所示,这通常是一个很好的方法。然而,在评论中,您指出,最好避免整个原子在生成进程时被锁定,而且这也可以以一种令人惊讶的简单方式实现:而不是拥有从 pid 到进程的映射,有一个从 pid 到进程延迟的映射。这样,您可以非常便宜地添加新的延迟,并且仅通过在调用之外取消引用延迟来实际创建进程swap!。取消引用延迟将阻止等待该特定延迟,因此需要同一进程的多个线程不会互相干扰,但原子本身将被解锁,从而允许需要不同进程的线程获得它。
下面是该方法的示例实现,以及您的问题所暗示的其他变量的示例定义,以使代码可以按原样运行:
(def process-results (atom []))
(defn create-process! [id]
;; pretend creating the process takes a long time
(Thread/sleep (* 1000 (rand-int 3)))
(future
;; running it takes longer, but happens on a new thread
(Thread/sleep (* 1000 (rand-int 10)))
(swap! process-results conj id)))
(def processes-atom (atom {}))
(defn cached-process [id]
(-> processes-atom
(swap! (fn [processes]
(update processes id #(or % (delay (create-process! id))))))
(get id)
(deref)))
Run Code Online (Sandbox Code Playgroud)
当然,仅cached-process当您已经定义了其他内容时才需要。以及一个示例运行,以显示进程已成功重用:
(defn stress-test [num-processes]
(reset! process-results [])
(reset! processes-atom {})
(let [running-processes (doall (for [i (range num-processes)]
(cached-process (rand-int 10))))]
(run! deref running-processes)
(deref process-results)))
user> (time (stress-test 40))
"Elapsed time: 18004.617869 msecs"
[1 5 2 0 9 7 8 4 3 6]
Run Code Online (Sandbox Code Playgroud)