nodejs中的RXJS PostgreSQL背压

Jak*_*icz 2 javascript postgresql node.js rxjs typescript

原标题:如何在javascript中减慢PgSQL结果行流?

我在使用RXJS(5.4.0)和PostgreSQL(驱动程序"pg":"6.1.4")的nodejs v4.5.0中遇到了内存不足的问题.

我手动创建一个可观察的PgSQL行,如下所示:

return Rx.Observable.create((subscriber) => {
    pool.connect().then((client: pg.Client) => {
        const stream:any = client.query(query.toParam());
        stream.on('row', (row) => {
            subscriber.next(row);
        });

        stream.on('end', () => {
            subscriber.complete();
            client.release();
        });
    });
});
Run Code Online (Sandbox Code Playgroud)

然后我将一些运算符附加到rx observable并进行一些处理.请注意,从数据库返回的行有点重.

调查得出一个结论: 数据库中的行返回得快得多,然后才能处理.必须为重型数据保留内存才能等待处理,这会导致内存不足问题:

致命错误:CALL_AND_RETRY_LAST分配失败 - 处理内存不足

中止陷阱:6

我没有在PostgreSQL驱动程序上看到任何选项来暂停流.我有什么想法可以解决这个问题?

pau*_*els 5

如果您使用同一作者的pg-cursor,则应该相对简单:

return Rx.Observable.defer(() => pg.connect())
  .flatMap(client => {
    const cursor = client.query(new Cursor('SELECT * FROM some_table WHERE prop > $1', [100]))

    const observableCursor = Rx.Observable.bindNodeCallback(cursor.read.bind(cursor));

    // Get the first 100 items
    observableCursor(100)
      .map(processRows)
      // This will only emit after the first one completes
      // and will recursively call this for each result
      .expand(_ => 
        observableCursor(100)
          .map(processRows)
      )
      // Unsubscribes once we don't get any more results
      .takeWhile(rows => rows.length > 0)
  });
Run Code Online (Sandbox Code Playgroud)