pet*_*pun 5 parallel-processing logic functional-programming clojure terminate
我想定义一个谓词,以某些谓词与相应的输入作为输入(它们可以作为懒惰的调用序列提供),并行运行它们并计算逻辑或结果,以使谓词调用终止返回时true,整个计算也终止(return true)。
除了提供时间优化之外,这还有助于避免在某些情况下不终止(某些谓词调用可能不会终止)。实际上,undefined该谓词将非终止解释为第三个值,它模拟Kleene K3逻辑
(初始居中Kleene代数中的连接)的or运算。
Haskell家族的情况与此类似。Clojure中有任何(最好是简单的)方法可以做到这一点吗?
编辑:在阅读评论后,我决定添加一些说明。
(a)首先,在线程池用尽之后发生的事情不太重要。我认为创建一个足以满足我们需求的线程池是一个合理的约定。
(b)最关键的要求是谓词调用开始并行运行,并且一旦谓词调用终止返回true,所有其他运行线程都会被中断。预期的行为是:
true:并行或返回truefalse换句话说,它的行为就像在由下式给出的3元素晶格中的加入false< undefined< true,与undefined代表非终止。
(c)并行或应该能够接受许多谓词和许多谓词输入(每个对应于一个谓词)作为输入。但是,如果将延迟序列作为输入,那就更好了。然后,命名并行或pany(对于“任何并行”),我们可以进行如下调用:
(pany (map (comp eval list) predicates inputs))(pany (map (comp eval list) predicates (repeat input)))(pany (map (comp eval list) (repeat predicate) inputs)) 相当于 (pany (map predicate (unchunk inputs)))最后,我认为,很自然地要求这样的事情pany,例如双重pall,机制或机制,以构建此类尽早终止的并行约简,以易于实现,甚至内置于面向并行性的语言(如Clojure)中。
我将根据约简函数定义我们的谓词。实际上,我们可以重新实现所有 Clojure 迭代函数来支持这种并行操作,但我将仅使用 reduce 作为示例。
我将定义一个计算函数。我只会使用同一个,但没有什么可以阻止你拥有多个。如果累加到 1000,则该函数为“真”。
(defn computor [acc val]
(let [new (+' acc val)] (if (> new 1000) (reduced new) new)))
(reduce computor 0 (range))
;; =>
1035
(reduce computor 0 (range Long/MIN_VALUE 0))
;; =>
;; ...this is a proxy for a non-returning computation
;; wrap these up in a form suitable for application of reduction
(def predicates [[computor 0 (range)]
[computor 0 (range Long/MIN_VALUE 0)]])
Run Code Online (Sandbox Code Playgroud)
现在让我们开始讨论这个问题的核心内容。我想在每个计算中迈出一步,如果其中一个计算完成,我想返回它。事实上,使用 pmap 一次一步非常慢 - 工作单元太小,不值得线程化。在这里,我进行了更改,在继续之前对每个工作单元进行 1000 次迭代。您可能会根据您的工作负载和步骤的成本来调整它。
(defn p-or-reducer* [reductions]
(let [splits (map #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
complete (some #(if (empty? (second %)) (last (first %))) splits)]
(or complete (recur (map second splits)))))
Run Code Online (Sandbox Code Playgroud)
然后我将其包装在驱动程序中。
(defn p-or [s]
(p-or-reducer* (map #(apply reductions %) s)))
(p-or predicates)
;; =>
1035
Run Code Online (Sandbox Code Playgroud)
CPU并行度在哪里插入?p-or-reducer* 中的 s/map/pmap/ 应该可以做到。我建议仅并行化第一个操作,因为这将驱动减少序列进行计算。
(defn p-or-reducer* [reductions]
(let [splits (pmap #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
complete (some #(if (empty? (second %)) (last (first %))) splits)]
(or complete (recur (map second splits)))))
(def parallelism-tester (conj (vec (repeat 40000 [computor 0 (range Long/MIN_VALUE 0)]))
[computor 0 (range)]))
(p-or parallelism-tester) ;; terminates even though the first 40K predicates will not
Run Code Online (Sandbox Code Playgroud)
定义一个高性能的通用版本是非常困难的。在不知道每次迭代成本的情况下,很难得出有效的并行策略 - 如果一次迭代需要 10 秒,那么我们可能会一次执行一步。如果需要 100 纳秒,那么我们需要一次执行多个步骤。