将数据写入文件时,fs.createWriteStream不使用背压,从而导致大量内存使用

Un1*_*Un1 15 javascript node.js

问题

我试图扫描驱动器目录(以递归方式遍历所有路径),并将所有路径写到文件中(fs.createWriteStream以便找到它们),以保持较低的内存使用率,但内存使用率却无法正常工作在扫描期间达到2GB。

预期

我期望fs.createWriteStream在任何时候都可以自动处理内存/磁盘使用情况,同时将内存使用情况保持在最低水平,并避免背压。

const fs = require('fs')
const walkdir = require('walkdir')

let dir = 'C:/'

let options = {
  "max_depth": 0,
  "track_inodes": true,
  "return_object": false,
  "no_return": true,
}

const wstream = fs.createWriteStream("C:/Users/USERNAME/Desktop/paths.txt")

let walker = walkdir(dir, options)

walker.on('path', (path) => {
  wstream.write(path + '\n')
})

walker.on('end', (path) => {
  wstream.end()
})
Run Code Online (Sandbox Code Playgroud)

是因为我不使用.pipe()吗?我尝试创建一个new Stream.Readable({read{}}),然后在.on('path'发射器内部将路径推入其中,readable.push(path)但这并没有真正起作用。

更新:

方法2:

我尝试了在answers drain方法中提出的建议,但并没有太大帮助,它确实将内存使用量减少到500mb(对于流来说仍然太多了),但是却大大降低了代码的速度(从几秒钟到几分钟)

方法3:

我也尝试使用readdirp,它使用更少的内存(〜400mb)并且速度更快,但是我不知道如何暂停它并使用drain那里的方法来进一步减少内存使用:

const readdirp = require('readdirp')

let dir = 'C:/'
const wstream = fs.createWriteStream("C:/Users/USERNAME/Desktop/paths.txt")

readdirp(dir, {alwaysStat: false, type: 'files_directories'})
  .on('data', (entry) => {
    wstream.write(`${entry.fullPath}\n`)
  })
Run Code Online (Sandbox Code Playgroud)

方法4:

我也尝试使用自定义的递归walker进行此操作,尽管它只使用了30mb的内存,这是我想要的,但是它比该readdirp方法慢了10倍,synchronous这是不希望的:

const fs = require('fs')
const path = require('path')

let dir = 'C:/'
function customRecursiveWalker(dir) {
  fs.readdirSync(dir).forEach(file => {
    let fullPath = path.join(dir, file)
    // Folders
    if (fs.lstatSync(fullPath).isDirectory()) {
      fs.appendFileSync("C:/Users/USERNAME/Desktop/paths.txt", `${fullPath}\n`)
      customRecursiveWalker(fullPath)
    } 
    // Files
    else {
      fs.appendFileSync("C:/Users/USERNAME/Desktop/paths.txt", `${fullPath}\n`)
    }  
  })
}
customRecursiveWalker(dir)
Run Code Online (Sandbox Code Playgroud)

Lou*_*uis 5

初步观察:您试图使用多种方法来获得所需的结果。比较您所使用的方法时,一个复杂之处是它们不能全部完成相同的工作。如果在仅包含常规文件的文件树上运行测试,该树不包含安装点,则可能可以公平地比较方法,但是当您开始添加安装点,符号链接等时,可能会获得不同的内存和时间统计信息仅由于一种方法排除了另一种方法包含的文件这一事实。

我最初尝试使用解决方案readdirp,但不幸的是,但是该库对我来说似乎有问题。在这里在我的系统上运行它,结果不一致。一次运行将输出10Mb的数据,另一次运行具有相同的输入参数将输出22Mb,然后得到另一个数字,依此类推。我查看了一下代码,发现该代码不遵守以下返回值push

_push(entry) {
    if (this.readable) {
      this.push(entry);
    }
}
Run Code Online (Sandbox Code Playgroud)

根据文档,push方法可能返回一个false值,在这种情况下,Readable流应停止生成数据并等待直到_read再次被调用。readdirp完全忽略了规范的那部分。至关重要的是要注意的返回值,push以正确处理背压。该代码中还有其他一些似乎值得怀疑的事情。

因此,我放弃了这一点,并进行了概念验证,展示了如何做到这一点。关键部分是:

  1. push方法返回时false,必须停止将数据添加到流中。相反,我们记录我们在哪里,然后停下来。

  2. 我们只有在_read被调用时才重新开始。

如果取消注释console.log打印START和的语句STOP。您会在控制台上看到它们的顺序打印。我们开始,产生数据,直到Node告诉我们停止,然后停止,直到Node告诉我们再次开始,依此类推。

const stream = require("stream");
const fs = require("fs");
const { readdir, lstat } = fs.promises;
const path = require("path");

class Walk extends stream.Readable {
  constructor(root, maxDepth = Infinity) {
    super();

    this._maxDepth = maxDepth;

    // These fields allow us to remember where we were when we have to pause our
    // work.

    // The path of the directory to process when we resume processing, and the
    // depth of this directory.
    this._curdir = [root, 1];

    // The directories still to process.
    this._dirs = [this._curdir];

    // The list of files to process when we resume processing.
    this._files = [];

    // The location in `this._files` were to continue processing when we resume.
    this._ix = 0;

    // A flag recording whether or not the fetching of files is currently going
    // on.
    this._started = false;
  }

  async _fetch() {
    // Recall where we were by loading the state in local variables.
    let files = this._files;
    let dirs = this._dirs;
    let [dir, depth] = this._curdir;
    let ix = this._ix;

    while (true) {
      // If we've gone past the end of the files we were processing, then
      // just forget about them. This simplifies the code that follows a bit.
      if (ix >= files.length) {
        ix = 0;
        files = [];
      }

      // Read directories until we have files to process.
      while (!files.length) {
        // We've read everything, end the stream.
        if (dirs.length === 0) {
          // This is how the stream API requires us to indicate the stream has
          // ended.
          this.push(null);

          // We're no longer running.
          this._started = false;
          return;
        }

        // Here, we get the next directory to process and get the list of
        // files in it.
        [dir, depth] = dirs.pop();

        try {
          files = await readdir(dir, { withFileTypes: true });
        }
        catch (ex) {
          // This is a proof-of-concept. In a real application, you should
          // determine what exceptions you want to ignore (e.g. EPERM).
        }
      }

      // Process each file.
      for (; ix < files.length; ++ix) {
        const dirent = files[ix];
        // Don't include in the results those files that are not directories,
        // files or symbolic links.
        if (!(dirent.isFile() || dirent.isDirectory() || dirent.isSymbolicLink())) {
          continue;
        }

        const fullPath = path.join(dir, dirent.name);
        if (dirent.isDirectory() & depth < this._maxDepth) {
          // Keep track that we need to walk this directory.
          dirs.push([fullPath, depth + 1]);
        }

        // Finally, we can put the data into the stream!
        if (!this.push(`${fullPath}\n`)) {
          // If the push returned false, we have to stop pushing results to the
          // stream until _read is called again, so we have to stop.

          // Uncomment this if you want to see when the stream stops.
          // console.log("STOP");

          // Record where we were in our processing.
          this._files = files;
          // The element at ix *has* been processed, so ix + 1.
          this._ix = ix + 1;
          this._curdir = [dir, depth];

          // We're stopping, so indicate that!
          this._started = false;
          return;
        }
      }
    }
  }

  async _read() {
    // Do not start the process that puts data on the stream over and over
    // again.
    if (this._started) {
      return;
    }

    this._started = true; // Yep, we've started.

    // Uncomment this if you want to see when the stream starts.
    // console.log("START");

    await this._fetch();
  }
}

// Change the paths to something that makes sense for you.
stream.pipeline(new Walk("/home/", 5),
                fs.createWriteStream("/tmp/paths3.txt"),
                (err) => console.log("ended with", err));
Run Code Online (Sandbox Code Playgroud)

当我运行您在walkdir此处进行的第一次尝试时,我得到以下统计信息:

  • 经过的时间(挂钟):59秒
  • 最大常驻集大小:2.90 GB

当我使用上面显示的代码时:

  • 经过的时间(挂钟):35秒
  • 最大居民集大小:0.1 GB

我用于测试的文件树产生了792 MB的文件列表