懒惰的分区

tem*_*ept 8 clojure lazy-evaluation lazy-sequences

我有一个项目源,并希望单独处理它们具有相同的键功能值的运行.在Python中,这看起来像

for key_val, part in itertools.groupby(src, key_fn):
  process(key_val, part)
Run Code Online (Sandbox Code Playgroud)

这个解决方案是完全懒惰的,即如果process不尝试存储整个内容part,代码将在O(1)内存中运行.

Clojure解决方案

(doseq [part (partition-by key-fn src)]
  (process part))
Run Code Online (Sandbox Code Playgroud)

不那么懒:它完全实现了每个部分.问题是,src可能有很长的项目具有相同的key-fn值,并意识到它们可能导致OOM.

我发现这个讨论的地方声称以下函数(稍微修改了帖子中的命名一致性)是懒惰的

(defn lazy-partition-by [key-fn coll]
  (lazy-seq
    (when-let [s (seq coll)]
      (let [fst (first s)
            fv (key-fn fst)
            part (lazy-seq (cons fst (take-while #(= fv (key-fn %)) (next s))))]
        (cons part (lazy-partition-by key-fn (drop-while #(= fv (key-fn %)) s)))))))
Run Code Online (Sandbox Code Playgroud)

但是,我不明白为什么它不会受到OOM的影响:cons单元的两个部分都有一个引用s,所以虽然process消耗part,s但是实现但不是垃圾收集.它只有在drop-while遍历时才有资格获得GC part.

所以,我的问题是:

  1. 我是否正确lazy-partition-by不够懒惰?
  2. 是否有partition-by保证内存要求的实现,只要我在part开始实现下一个时没有对前一个引用?

编辑:这是Haskell中一个懒惰的实现:

lazyPartitionBy :: Eq b => (a -> b) -> [a] -> [[a]]
lazyPartitionBy _ [] = []
lazyPartitionBy keyFn xl@(x:_) = let
  fv = keyFn x
  (part, rest) = span ((== fv) . keyFn) xl
  in part : lazyPartitionBy keyFn rest
Run Code Online (Sandbox Code Playgroud)

span实现可以看出,partrest隐式共享状态.我想知道这种方法是否可以翻译成Clojure.

ama*_*loy 7

我在这些场景中使用的经验法则(即,您希望单个输入序列生成多个输出序列的那些)是,在以下三个理想属性中,您通常只能有两个:

  1. 效率(仅输入一次输入序列,因此不能保持其头部)
  2. 懒惰(仅按需生产元素)
  3. 没有共享的可变状态

clojure.core中的版本选择(1,3),但通过一次生成整个分区放弃(2).Python和Haskell都选择(1,2),虽然它并不是很明显:Haskell根本没有可变状态吗?好吧,它对所有内容(不仅仅是序列)的懒惰评估意味着所有表达式都是thunk,它们最初是空白的,只有在需要它们时才会被写入; 实行span,就像你说的,共享相同的thunk span p xs'两个输出序列,所以无论哪一个首先需要它"发送",它与其它序列的结果,这是必要的,以保持一定距离影响的行动其他不错的属性.

正如您所指出的,您链接的备选Clojure实现选择(2,3).

问题是,对于partition-by,拒绝(1)(2)意味着你持有某个序列的头部:输入或输出之一.因此,如果您想要一个可以处理任意大输入的任意大分区的解决方案,您需要选择(1,2).在Clojure中有几种方法可以做到这一点:

  1. 采用Python方法:返回更像迭代器而不是seq的东西 - seqs对非变异做出更强的保证,并承诺你可以安全地遍历它们多次等等.如果不是seqs的seq,你返回迭代器迭代器,然后从任何一个迭代器中消耗项目可以自由地改变或使其他迭代器无效.这可以保证消费按顺序发生,并且可以释放内存.
  2. 采用Haskell方法:通过大量调用手动打包所有内容,delay并要求客户端force根据需要经常调用以获取数据.这在Clojure中会更加丑陋,并且会大大增加您的堆栈深度(在非平凡的输入上使用它可能会破坏堆栈),但理论上这是可能的.
  3. 通过在输出seq之间协调一些可变数据对象来编写更多Clojure风格(但仍然非常不寻常),每个都在需要时从其中任何一个请求更新.

我很确定这三种方法中的任何一种都是可能的,但说实话,它们都很难,而且根本不是很自然.Clojure的序列抽象只是不容易产生你想要的数据结构.我的建议是,如果你需要这样的东西,并且分区可能太大而不适合放置,你只需接受一种稍微不同的格式并自己做更多的簿记:避免(1,2,3)难以避免产生多重因素完全输出序列!

因此,你可以接受一种稍微丑陋的格式,而不是((2 4 6 8) (1 3 5) (10 12) (7))你的输出格式.这既不难产生也不难消费,虽然写出来有点冗长乏味.(partition-by even? [2 4 6 8 1 3 5 10 12 7])([::key true] 2 4 6 8 [::key false] 1 3 5 [::key true] 10 12 [::key false] 7)

这是生产函数的一个合理实现:

(defn lazy-partition-by [f coll]
  (lazy-seq
    (when (seq coll)
      (let [x (first coll)
            k (f x)]
        (list* [::key k] x
         ((fn part [k xs]
            (lazy-seq
              (when (seq xs)
                (let [x (first xs)
                      k' (f x)]
                  (if (= k k')
                    (cons x (part k (rest xs)))
                    (list* [::key k'] x (part k' (rest xs))))))))
          k (rest coll)))))))
Run Code Online (Sandbox Code Playgroud)

以下是如何使用它,首先定义一个reduce-grouped隐藏分组格式细节的泛型,然后是一个示例函数count-partition-sizes来输出每个分区的键和大小而不在内存中保留任何序列:

(defn reduce-grouped [f init groups]
  (loop [k nil, acc init, coll groups]
    (if (empty? coll)
      acc
      (if (and (coll? (first coll)) (= ::key (ffirst coll)))
        (recur (second (first coll)) acc (rest coll))
        (recur k (f k acc (first coll)) (rest coll))))))

(defn count-partition-sizes [f coll]
  (reduce-grouped (fn [k acc _]
                    (if (and (seq acc) (= k (first (peek acc))))
                      (conj (pop acc) (update-in (peek acc) [1] inc))
                      (conj acc [k 1])))
                  [] (lazy-partition-by f coll)))

user> (lazy-partition-by even? [2 4 6 8 1 3 5 10 12 7])
([:user/key true] 2 4 6 8 [:user/key false] 1 3 5 [:user/key true] 10 12 [:user/key false] 7)
user> (count-partition-sizes even? [2 4 6 8 1 3 5 10 12 7])
[[true 4] [false 3] [true 2] [false 1]]
Run Code Online (Sandbox Code Playgroud)

编辑:再看一遍,我并不是真的相信我的reduce-grouped用处比(reduce f init (map g xs))它更有用,因为它并没有真正给你任何明确指示关键时刻的变化.所以如果你确实需要知道一个团队什么时候发生变化,你就会想要一个更聪明的抽象,或者使用我的原创lazy-partition-by而没有任何"聪明"的包装.


A. *_*ebb 3

尽管这个问题引发了关于语言设计的非常有趣的思考,但实际问题是您想要在常量内存中的分区上进行处理。实际问题只需稍加反转即可解决。

不要处理返回分区序列的函数的结果,而是将处理函数传递到生成分区的函数中。然后,您可以以封闭的方式控制状态。

首先,我们将提供一种将序列的消耗与尾部状态融合在一起的方法。

(defn fuse [coll wick]
  (lazy-seq 
   (when-let [s (seq coll)]
     (swap! wick rest)
     (cons (first s) (fuse (rest s) wick)))))
Run Code Online (Sandbox Code Playgroud)

然后是修改后的版本partition-by

(defn process-partition-by [processfn keyfn coll] 
  (lazy-seq
    (when (seq coll)
      (let [tail (atom (cons nil coll))
            s (fuse coll tail)
            fst (first s)
            fv (keyfn fst)
            pred #(= fv (keyfn %))
            part (take-while pred s)
            more (lazy-seq (drop-while pred @tail))] 
        (cons (processfn part) 
              (process-partition-by processfn keyfn more))))))
Run Code Online (Sandbox Code Playgroud)

注意:对于 O(1) 的内存消耗,processfn必须是急切的消费者!所以 while(process-partition-by identity key-fn coll)和 一样(partition-by key-fn coll),因为identity不消耗分区,所以内存消耗不是恒定的。


测试

(defn heavy-seq [] 
  ;adjust payload for your JVM so only a few fit in memory
  (let [payload (fn [] (long-array 20000000))]
   (map #(vector % (payload)) (iterate inc 0))))

(defn my-process [s] (reduce + (map first s)))

(defn test1 []
  (doseq [part (partition-by #(quot (first %) 10) (take 50 (heavy-seq)))]
    (my-process part)))

(defn test2 []
  (process-partition-by 
    my-process #(quot (first %) 20) (take 200 (heavy-seq))))

so.core=> (test1)
OutOfMemoryError Java heap space  [trace missing]

so.core=> (test2)
(190 590 990 1390 1790 2190 2590 2990 3390 3790)
Run Code Online (Sandbox Code Playgroud)