Clojure 函数在执行之前等待另一个函数完成

Mat*_*ton 4 concurrency atomic clojure

我需要一个函数,当使用特定的输入参数调用时,执行提供的函数 g,但只有在另一个提供的函数 f 使用相同的输入参数完成执行之后。还有一个要求,当使用相同的输入args多次调用该函数时,f仅在第一次调用时执行一次,其他调用等待此完成,然后直接执行g。

编辑:该解决方案应该在不同线程上并行运行时工作,并且还应该有效地使用线程。例如,阻塞应该基于每个输入而不是整个函数。

我对该功能的第一次尝试如下:

(defn dependent-func
  ([f g]
   (let [mem (atom {})]
     (fn [& args]
       (->> (get (locking mem
                   (swap! mem (fn [latch-map args]
                                (if (contains? latch-map args)
                                  latch-map
                                  (let [new-latch (CountDownLatch. 1)
                                        new-latch-map (assoc latch-map args new-latch)]
                                    (->> (Thread. #(do (apply f args)
                                                       (.countDown new-latch)))
                                         (.start))
                                new-latch-map))) args)) args)
            (.await))
       (apply g args)))))
Run Code Online (Sandbox Code Playgroud)

这似乎满足我的要求,并且等待 f 是基于每个输入的,所以我对此相对满意。最初我希望只使用交换!进行内存更新,但不幸的是交换!明确指出该函数位于 swap! 可以被多次调用(我在测试中看到过这一点)。因此,我最终不得不在更新时锁定内存,这真的很难看。

我确信一定有一种更干净的方法可以比我更好地利用 Closure 的并发机制,但到目前为止我还没有找到它。

任何建议将不胜感激。

谢谢,

马特。

Art*_*ldt 6

Clojure 的futurepromise、 和组合deliver非常适合启动一个进程并让多个线程等待它完成。

  • Future 用于在后台启动一个线程(它可以做更多的事情,尽管在这个例子中我不需要它)

  • Promise 用于在准备好后立即返回一个包含答案的对象。

  • 交付用于在准备好后提供承诺的答案。

我还将等待部分拆分为它自己的函数,以使代码更易于理解,因此我可以使用内置的 memoize 函数:

这个问题是一个很好的例子,说明何时使用 Promise 和 Deliver 而不仅仅是 Future。

因为我们要在两次运行该函数不安全的地方使用 memoize,所以我们需要小心这两个调用不要完全相同地同时进入 memoize。因此,我们将仅锁定进入 memoize 的那一刻,而不是锁定 memoized 函数的持续时间。

hello.core> (def lock [])
#'hello.core/lock
Run Code Online (Sandbox Code Playgroud)

每次使用给定的参数集调用 f 时,此函数将始终返回相同的 future 对象,除非我们需要通过将其包装在执行锁定的函数中来确保 memoize 安全(您也可以为此使用代理)

hello.core> (def wait-for-function-helper             
              (memoize (fn [f args]
                         (let [answer (promise)]
                           (println "waiting for function " f " with args" args)
                           (future (deliver answer (apply f args)))
                           answer))))

#'hello.core/wait-for-function-helper
hello.core> (defn wait-for-function [& args]
              (locking lock
                (apply wait-for-function-helper args)))
#'hello.core/wait-for-function
Run Code Online (Sandbox Code Playgroud)

现在我们编写实际的 dependent-func 函数,该函数使用安全记忆的、未来生成的 wait-for-function 函数。

hello.core> (defn dependent-func [f g & args]
              @(wait-for-function f args)
              (apply g args))
#'hello.core/dependent-func
Run Code Online (Sandbox Code Playgroud)

并定义一个慢速操作来查看它的运行情况:

hello.core> (defn slow-f-1 [x]
              (println "starting slow-f-1")
              (Thread/sleep 10000)
              (println "finishing slow-f-1")
              (dec x))
#'hello.core/slow-f-1
Run Code Online (Sandbox Code Playgroud)

为了测试它,我们想同时启动两个相同的函数

hello.core> (do (future
                  (println "first" (dependent-func slow-f-1 inc 4)))
                (future
                  (println "second" (dependent-func slow-f-1 inc 4))))

waiting for function  
#object[clojure.core$future_call$reify__6736 0x40534083 {:status :pending, :val nil}] with args (4)
#object[hello.core$slow_f_1 0x4f9b3396 hello.core$slow_f_1@4f9b3396]
starting slow-f-1
finishing slow-f-1
second
first
5
5
Run Code Online (Sandbox Code Playgroud)

如果我们再次调用它,我们会看到 Slow-f-1 只运行过一次:

hello.core> (do (future
                  (println "first" (dependent-func slow-f-1 inc 4)))
                (future
                  (println "second" (dependent-func slow-f-1 inc 4))))

#object[clojure.core$future_call$reify__6736 0x3935ea29 {:status :pending, :val nil}]
first 5
second 5
Run Code Online (Sandbox Code Playgroud)