为什么pmap | reducers/map没有使用所有cpu核心?

eug*_*ecm 5 clojure reducers pmap cheshire

我正在尝试解析一个包含一百万行的文件,每一行都是一个json字符串,其中包含一些关于书籍的信息(作者,内容等).我正在使用iota加载文件,因为我的程序会抛出一个OutOfMemoryError如果我尝试使用slurp.我也用cheshire来解析字符串.该程序只需加载一个文件并计算所有书籍中的所有单词.

我的第一次尝试包括pmap做繁重的工作,我认为这基本上会利用我所有的cpu核心.

(ns multicore-parsing.core
  (:require [cheshire.core :as json]
            [iota :as io]
            [clojure.string :as string]
            [clojure.core.reducers :as r]))


(defn words-pmap
  [filename]
  (letfn [(parse-with-keywords [str]
            (json/parse-string str true))
          (words [book]
            (string/split (:contents book) #"\s+"))]
    (->>
     (io/vec filename)
     (pmap parse-with-keywords)
     (pmap words)
     (r/reduce #(apply conj %1 %2) #{})
     (count))))
Run Code Online (Sandbox Code Playgroud)

虽然它似乎确实使用了所有核心,但每个核心很少使用超过50%的容量,我的猜测是它与pmap的批量大小有关,因此我偶然发现了相对较旧的问题,其中一些注释引用了该clojure.core.reducers库.

我决定使用reducers/map以下方法重写函数:

(defn words-reducers
  [filename]
  (letfn [(parse-with-keywords [str]
            (json/parse-string str true))
          (words [book]
            (string/split (:contents book) #"\s+"))]
  (->>
   (io/vec filename)
   (r/map parse-with-keywords)
   (r/map words)
   (r/reduce #(apply conj %1 %2) #{})
   (count))))
Run Code Online (Sandbox Code Playgroud)

但是cpu的使用情况更糟糕,与之前的实现相比,它甚至需要更长的时间才能完成:

multicore-parsing.core=> (time (words-pmap "./dummy_data.txt"))
"Elapsed time: 20899.088919 msecs"
546
multicore-parsing.core=> (time (words-reducers "./dummy_data.txt"))
"Elapsed time: 28790.976455 msecs"
546
Run Code Online (Sandbox Code Playgroud)

我究竟做错了什么?在解析大文件时,mmap loading + reducers是正确的方法吗?

编辑:是我正在使用的文件.

EDIT2:以下是时间iota/seq而不是iota/vec:

multicore-parsing.core=> (time (words-reducers "./dummy_data.txt"))
"Elapsed time: 160981.224565 msecs"
546
multicore-parsing.core=> (time (words-pmap "./dummy_data.txt"))
"Elapsed time: 160296.482722 msecs"
546
Run Code Online (Sandbox Code Playgroud)

Pau*_*her 3

我不相信减速器将是适合您的解决方案,因为它们根本不能很好地处理惰性序列(减速器将通过惰性序列给出正确的结果,但不会很好地并行化) )。

\n\n

您可能想看一下《七周内的七个并发模型》一书中的示例代码(免责声明:我是作者),它解决了类似的问题(计算每个单词在维基百科上出现的次数)。

\n\n

给定维基百科页面列表,此函数按顺序计算单词数量(get-words返回页面中的单词序列):

\n\n
(defn count-words-sequential [pages]\n  (frequencies (mapcat get-words pages)))\n
Run Code Online (Sandbox Code Playgroud)\n\n

这是一个并行版本,使用pmap它确实运行得更快,但仅快 1.5 倍左右:

\n\n
(defn count-words-parallel [pages]\n  (reduce (partial merge-with +)\n    (pmap #(frequencies (get-words %)) pages)))\n
Run Code Online (Sandbox Code Playgroud)\n\n

它只快 1.5 倍左右的原因是因为它reduce成为瓶颈\xe2\x80\x94it\的调用(partial merge-with +)每个页面在 4 核机器上,批量合并 100 个页面可将性能提高至约 3.2 倍:

\n\n
(defn count-words [pages]\n  (reduce (partial merge-with +)\n    (pmap count-words-sequential (partition-all 100 pages))))\n
Run Code Online (Sandbox Code Playgroud)\n