当mapcat在core.async中破坏背压时,内存泄漏在哪里?

Pet*_*ott 9 memory-leaks asynchronous clojure backpressure core.async

我在Clojure中编写了一些core.async代码,当我运行它时,它消耗了所有可用的内存并因错误而失败.似乎mapcat在core.async管道中使用可以打破压力.(由于超出本问题范围的原因,这是不幸的.)

下面是一些通过计算:x输入和输出mapcat换能器来演示问题的代码:

(ns mapcat.core
  (:require [clojure.core.async :as async]))

(defn test-backpressure [n length]
  (let [message (repeat length :x)
        input (async/chan)
        transform (async/chan 1 (mapcat seq))
        output (async/chan)
        sent (atom 0)]
    (async/pipe input transform)
    (async/pipe transform output)
    (async/go
      (dotimes [_ n]
        (async/>! input message)
        (swap! sent inc))
      (async/close! input))
    (async/go-loop [x 0]
      (when (= 0 (mod x (/ (* n length) 10)))
        (println "in:" (* @sent length) "out:" x))
      (when-let [_ (async/<! output)]
        (recur (inc x))))))

=> (test-backpressure 1000 10)
in: 10 out: 0
in: 2680 out: 1000
in: 7410 out: 2000
in: 10000 out: 3000 ; Where are the other 7000 characters?
in: 10000 out: 4000
in: 10000 out: 5000
in: 10000 out: 6000
in: 10000 out: 7000
in: 10000 out: 8000
in: 10000 out: 9000
in: 10000 out: 10000
Run Code Online (Sandbox Code Playgroud)

制片人远远领先于消费者.

看来我不是第一个发现这个的人.但是这里给出的解释似乎并没有涵盖它.(虽然它确实提供了足够的解决方法.)从概念上讲,我希望生产者能够领先,但只能通过可能在通道中缓冲的少数消息的长度.

我的问题是,所有其他消息在哪里?到第四行输出7000 :xs是下落不明的.

Pet*_*ott 2

更新 2020-01-14:内存泄漏现已修复。

对于“内存泄漏在哪里?”这个问题有两种可能的解释。

首先,数据保存在哪里?答案似乎就在紧邻扩展变换下游的通道缓冲区中。

通道默认使用FixedBufferclojure.core.async.impl.buffers/FixedBuffer),它可以判断它是否已满,但不反对超满。

其次,哪一段代码导致缓冲区满了?这(如果我错了,请纠正我)似乎是在(clojure.core.async.impl.channels/ManyToManyChannel)的方法take!,其中对缓冲区的第一次调用发生在任何调用发生之前。ManyToManyChanneladd!full?

似乎take!假设它可以为它删除的每一项至少添加一项到缓冲区。对于诸如此类的长期运行的膨胀传感器来说,mapcat这并不总是一个安全的假设。

通过将此行更改为(when (and (.hasNext iter) (not (impl/full? buf)))core.async 的本地副本,我可以使问题中的代码按预期运行。(注意,我对 core.async 的理解不足以保证这是适合您的用例的强大解决方案。)

更新 2016-09-17:现在存在一个问题:http://dev.clojure.org/jira/browse/ASYNC-178

更新 2020 年 1 月 14 日:此问题现已修复: https: //clojure.atlassian.net/browse/ASYNC-210(尽管之前的票证已被关闭为“已拒绝”)