Mat*_*ton 4 concurrency atomic clojure
我需要一个函数,当使用特定的输入参数调用时,执行提供的函数 g,但只有在另一个提供的函数 f 使用相同的输入参数完成执行之后。还有一个要求,当使用相同的输入args多次调用该函数时,f仅在第一次调用时执行一次,其他调用等待此完成,然后直接执行g。
编辑:该解决方案应该在不同线程上并行运行时工作,并且还应该有效地使用线程。例如,阻塞应该基于每个输入而不是整个函数。
我对该功能的第一次尝试如下:
(defn dependent-func
([f g]
(let [mem (atom {})]
(fn [& args]
(->> (get (locking mem
(swap! mem (fn [latch-map args]
(if (contains? latch-map args)
latch-map
(let [new-latch (CountDownLatch. 1)
new-latch-map (assoc latch-map args new-latch)]
(->> (Thread. #(do (apply f args)
(.countDown new-latch)))
(.start))
new-latch-map))) args)) args)
(.await))
(apply g args)))))
Run Code Online (Sandbox Code Playgroud)
这似乎满足我的要求,并且等待 f 是基于每个输入的,所以我对此相对满意。最初我希望只使用交换!进行内存更新,但不幸的是交换!明确指出该函数位于 swap! 可以被多次调用(我在测试中看到过这一点)。因此,我最终不得不在更新时锁定内存,这真的很难看。
我确信一定有一种更干净的方法可以比我更好地利用 Closure 的并发机制,但到目前为止我还没有找到它。
任何建议将不胜感激。
谢谢,
马特。
Clojure 的future、promise、 和组合deliver非常适合启动一个进程并让多个线程等待它完成。
Future 用于在后台启动一个线程(它可以做更多的事情,尽管在这个例子中我不需要它)
Promise 用于在准备好后立即返回一个包含答案的对象。
交付用于在准备好后提供承诺的答案。
我还将等待部分拆分为它自己的函数,以使代码更易于理解,因此我可以使用内置的 memoize 函数:
这个问题是一个很好的例子,说明何时使用 Promise 和 Deliver 而不仅仅是 Future。
因为我们要在两次运行该函数不安全的地方使用 memoize,所以我们需要小心这两个调用不要完全相同地同时进入 memoize。因此,我们将仅锁定进入 memoize 的那一刻,而不是锁定 memoized 函数的持续时间。
hello.core> (def lock [])
#'hello.core/lock
Run Code Online (Sandbox Code Playgroud)
每次使用给定的参数集调用 f 时,此函数将始终返回相同的 future 对象,除非我们需要通过将其包装在执行锁定的函数中来确保 memoize 安全(您也可以为此使用代理)
hello.core> (def wait-for-function-helper
(memoize (fn [f args]
(let [answer (promise)]
(println "waiting for function " f " with args" args)
(future (deliver answer (apply f args)))
answer))))
#'hello.core/wait-for-function-helper
hello.core> (defn wait-for-function [& args]
(locking lock
(apply wait-for-function-helper args)))
#'hello.core/wait-for-function
Run Code Online (Sandbox Code Playgroud)
现在我们编写实际的 dependent-func 函数,该函数使用安全记忆的、未来生成的 wait-for-function 函数。
hello.core> (defn dependent-func [f g & args]
@(wait-for-function f args)
(apply g args))
#'hello.core/dependent-func
Run Code Online (Sandbox Code Playgroud)
并定义一个慢速操作来查看它的运行情况:
hello.core> (defn slow-f-1 [x]
(println "starting slow-f-1")
(Thread/sleep 10000)
(println "finishing slow-f-1")
(dec x))
#'hello.core/slow-f-1
Run Code Online (Sandbox Code Playgroud)
为了测试它,我们想同时启动两个相同的函数。
hello.core> (do (future
(println "first" (dependent-func slow-f-1 inc 4)))
(future
(println "second" (dependent-func slow-f-1 inc 4))))
waiting for function
#object[clojure.core$future_call$reify__6736 0x40534083 {:status :pending, :val nil}] with args (4)
#object[hello.core$slow_f_1 0x4f9b3396 hello.core$slow_f_1@4f9b3396]
starting slow-f-1
finishing slow-f-1
second
first
5
5
Run Code Online (Sandbox Code Playgroud)
如果我们再次调用它,我们会看到 Slow-f-1 只运行过一次:
hello.core> (do (future
(println "first" (dependent-func slow-f-1 inc 4)))
(future
(println "second" (dependent-func slow-f-1 inc 4))))
#object[clojure.core$future_call$reify__6736 0x3935ea29 {:status :pending, :val nil}]
first 5
second 5
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1751 次 |
| 最近记录: |