Clojure:Group-by太慢(1300万行文件)

Jos*_*ine 6 group-by clojure incanter

情况

我有一个1300万行CSV,我想为每个组执行逻辑回归(incanter).我的文件是这样的(值只是样本)

ID Max Probability
1  1   0.5 
1  5   0.6
1  10  0.99
2  1   0.1
2  7   0.95
Run Code Online (Sandbox Code Playgroud)

所以我首先用csv阅读器阅读它,永远很好.

我有类似的东西:

( {"Id" "1", "Max" 1, "Probability" 0.5} {"Id" "1", "Max" 5, "Probability" 0.6} etc.
Run Code Online (Sandbox Code Playgroud)

我希望通过Id分组这些值,如果我没记错的话,那里有大约1.2百万的ID.(我用Python做了大熊猫,速度非常快)

这是我读取和格式化文件的功能(它适用于较小的数据集):

  (defn read-file
  []
    (let [path (:path-file @config)
          content-csv (take-csv path \,)]
      (->> (group-by :Id content-csv)
           (map (fn [[k v]]
                [k {:x (mapv :Max v) :y (mapv :Probability v)}]))
           (into {}))))
Run Code Online (Sandbox Code Playgroud)

我想最后有类似的东西来执行逻辑回归(我很灵活,不需要向量:x和:y,seqs都可以)

{"1" {:x [1 5 10] :y [0.5 0.6 0.99]} "2" {:x [1 7] :y [0.1 0.95]} etc.
Run Code Online (Sandbox Code Playgroud)

问题

我在分组操作方面遇到了麻烦.我在CSV的输出上分别尝试了它,并且由于Java堆空间内存而没有消失,这将永远消失.我认为问题是我的mapv事情,但这是分组.

我想过使用reduce或reduce-kv但是我不知道如何将这些函数用于这种目的.

我不关心":x"和":y"的顺序(只要它们在它们之间是相同的,我的意思是x和y具有相同的索引......不是问题,因为它们是相同的线)和最终结果的Ids我读了那个组 - 保持顺序.也许这是操作成本高昂的?

如果遇到任何人,我会给你样本数据:

(def sample '({"Id" "1" "Max" 1 "Probability" 0.5} {"Id" "1" "Max" 5 "Probability" 0.6} {"Id" "1" "Max" 10 "Probability" 0.99} {"Id" "2" "Max" 1 "Probability" 0.1} {"Id" "2" "Max" 7 "Probability" 0.95}))
Run Code Online (Sandbox Code Playgroud)

其他选择

我有其他想法,但我不确定他们是"Clojure" - 友好.

  • 在Python中,由于函数的性质,并且因为文件已经被排序,而不是使用group-by,我在每个组的数据帧开始和结束索引中写入,因此我只需要直接选择子数据.

  • 我也可以加载一个id列表,而不是从Clojure计算它.喜欢

    (def ids'("1""2"等.

所以也许可以从以下开始:

{"1" {:x [] :y []} "2" {:x [] :y []} etc.
Run Code Online (Sandbox Code Playgroud)

从上一个seq开始,然后匹配每个ID上的大文件.

我不知道它实际上是否更有效率.

我拥有逻辑回归的所有其他功能,我只是缺少这一部分!谢谢 !

编辑

谢谢你的答案,我终于有了这个解决方案.

在我的project.clj文件中

 :jvm-opts ["-Xmx13g"])
Run Code Online (Sandbox Code Playgroud)

代码:

(defn data-group->map [group]
  {(:Id (first group))
   {:x (map :Max group)
    :y (map :Probability group)}})


(defn prob-cumsum [data]
  (cag/fmap
    (fn [x]
      (assoc x :y (reductions + (x :y))))
  data))


(defn process-data-splitter [data]
  (->> (partition-by :Id data)
       (map data-group->map)
       (into {})
       (prob-cumsum)))
Run Code Online (Sandbox Code Playgroud)

我把所有代码都包装好了.拆分大约需要5分钟,但我不需要超速.对于文件读取,内存使用量可以达到所有内存,对于sigmoid则可以更少.

lee*_*ski 6

如果您的文件按ID排序,则可以使用partition-by而不是group-by.

那么你的代码看起来像这样:

(defn data-group->map [group]
  [(:Id (first group))
   {:x (mapv :Max group)
    :y (mapv :Probability group)}])

(defn read-file []
  (let [path (:path-file @config)
        content-csv (take-csv path \,)]
    (->> content-csv
         (partition-by :Id)
         (map data-group->map)
         (into {}))))
Run Code Online (Sandbox Code Playgroud)

这应该加快它.然后你可以使用传感器加快速度

(defn read-file []
  (let [path (:path-file @config)
        content-csv (take-csv path \,)]
    (into {} (comp (partition-by :Id)
                   (map data-group->map))
          content-csv)))
Run Code Online (Sandbox Code Playgroud)

让我们做一些测试:

首先生成像你这样的大数据:

(def huge-data
  (doall (mapcat #(repeat 
                     1000000
                     {:Id % :Max 1 :Probability 10})
           (range 10))))
Run Code Online (Sandbox Code Playgroud)

我们有一千万个项目数据集,其中有百万{:Id 0 :Max 1 :Probability 10},百万,{:Id 1 :Max 1 :Probability 10}等等.

现在要测试的功能:

(defn process-data-group-by [data]
  (->> (group-by :Id data)
       (map (fn [[k v]]
              [k {:x (mapv :Max v) :y (mapv :Probability v)}]))
       (into {})))

(defn process-data-partition-by [data]
  (->> data
       (partition-by :Id)
       (map data-group->map)
       (into {})))

(defn process-data-transducer [data]
  (into {} (comp (partition-by :Id) (map data-group->map)) data))
Run Code Online (Sandbox Code Playgroud)

现在时间测试:

(do (time (dorun (process-data-group-by huge-data)))
    (time (dorun (process-data-partition-by huge-data)))
    (time (dorun (process-data-transducer huge-data))))

"Elapsed time: 3377.167645 msecs"
"Elapsed time: 3707.03448 msecs"
"Elapsed time: 1462.955152 msecs"
Run Code Online (Sandbox Code Playgroud)

注意,partition-by产生延迟序列,而group-by应该实现整个集合.因此,如果您需要按组分组而不是整个地图,则可以(into {})更快地删除和访问每个数据:

(defn process-data-partition-by [data]
  (->> data
       (partition-by :Id)
       (map data-group->map)))
Run Code Online (Sandbox Code Playgroud)

校验:

user> (time (def processed-data (process-data-partition-by huge-data)))
"Elapsed time: 0.06079 msecs"
#'user/processed-data
user> (time (let [f (first processed-data)]))
"Elapsed time: 302.200571 msecs"
nil
user> (time (let [f (second processed-data)]))
"Elapsed time: 500.597153 msecs"
nil
user> (time (let [f (last processed-data)]))
"Elapsed time: 2924.588625 msecs"
nil
user.core> (time (let [f (last processed-data)]))
"Elapsed time: 0.037646 msecs"
nil
Run Code Online (Sandbox Code Playgroud)