使用生成器和Promise进行Node.js功能编程

Mua*_*adi 5 functional-programming node.js lodash ramda.js

摘要

node.js中的函数式编程是否足够通用?它是否可以用于处理小批量的db记录而不在使用中将所有记录加载到内存中toArray(从而耗尽内存)的现实问题。您可以阅读有关背景的批评。我们想用异步生成器演示这种node.js库的Mux和DeMux以及fork / tee / join功能。

语境

我在质疑使用任何函数式编程工具(例如ramdalodashimlazy)在node.js中进行函数式编程的有效性和普遍性)甚至自定义的。

给定

来自MongoDB游标的数百万条记录可以使用 await cursor.next()

您可能需要阅读更多内容about async generatorsfor-await-of

对于虚假数据,可以使用(在节点10上)

function sleep(ms) {
    return new Promise((resolve) => setTimeout(resolve, ms));
}
async function* getDocs(n) {
  for(let i=0;i<n;++i) {
     await sleep(1);
     yield {i: i, t: Date.now()};
  }
}
let docs=getDocs(1000000);
Run Code Online (Sandbox Code Playgroud)

通缉

我们需要

  • 第一份文件
  • 最后文件
  • 文件数量
  • 分为n个文档的批处理/批量处理,并为该批量处理发出一个socket.io事件

确保批次中包含第一个和最后一个文档,并且不消耗它们。

约束条件

数百万条记录不应加载到ram中,应该对其进行迭代,最多只能保存其中的一批。

可以使用常规的nodejs代码完成此要求,但也可以使用诸如此处的 applyspec之类的方法来完成。

R.applySpec({
  first: R.head(),
  last: R.last(),
  _: 
    R.pipe(
      R.splitEvery(n),
      R.map( (i)=> {return "emit "+JSON.stringify(i);})
    ) 
})(input)
Run Code Online (Sandbox Code Playgroud)

cus*_*der 2

我不确定是否可以公平地暗示函数式编程在处理大量数据时在性能方面比命令式编程有任何优势。

我认为您需要在工具包中添加另一个工具,这可能是RxJS

RxJS 是一个使用可观察序列编写异步和基于事件的程序的库。

如果您一般不熟悉 RxJS 或响应式编程,我的示例肯定看起来很奇怪,但我认为熟悉这些概念将是一项很好的投资

在您的例子中,可观察的序列是您的 MongoDB 实例,它随着时间的推移发出记录。

我要伪造你的数据库:

var db = range(1, 5);
Run Code Online (Sandbox Code Playgroud)

range函数是一个 RxJS 事物,它将在提供的范围内发出一个值。

db.subscribe(n => {
  console.log(`record ${n}`);
});

//=> record 1
//=> record 2
//=> record 3
//=> record 4
//=> record 5
Run Code Online (Sandbox Code Playgroud)

现在我只对第一条和最后一条记录感兴趣。

我可以创建一个仅发出第一条记录的可观察对象,并创建另一个仅发出最后一条记录的可观察对象:

var db = range(1, 5);
var firstRecord = db.pipe(first());
var lastRecord = db.pipe(last());

merge(firstRecord, lastRecord).subscribe(n => {
  console.log(`record ${n}`);
});
//=> record 1
//=> record 5
Run Code Online (Sandbox Code Playgroud)

不过,我还需要批量处理所有记录:(在本示例中,我将创建每批 10 条记录)

var db = range(1, 100);
var batches = db.pipe(bufferCount(10))
var firstRecord = db.pipe(first());
var lastRecord = db.pipe(last());

merge(firstRecord, batches, lastRecord).subscribe(n => {
  console.log(`record ${n}`);
});

//=> record 1
//=> record 1,2,3,4,5,6,7,8,9,10
//=> record 11,12,13,14,15,16,17,18,19,20
//=> record 21,22,23,24,25,26,27,28,29,30
//=> record 31,32,33,34,35,36,37,38,39,40
//=> record 41,42,43,44,45,46,47,48,49,50
//=> record 51,52,53,54,55,56,57,58,59,60
//=> record 61,62,63,64,65,66,67,68,69,70
//=> record 71,72,73,74,75,76,77,78,79,80
//=> record 81,82,83,84,85,86,87,88,89,90
//=> record 91,92,93,94,95,96,97,98,99,100
//=> record 100
Run Code Online (Sandbox Code Playgroud)

正如您在输出中看到的,它发出了:

  1. 第一条记录
  2. 十批,每批 10 条记录
  3. 最后记录

我不会尝试为您解决您的练习,而且我对 RxJS 不太熟悉,无法对此进行过多扩展。

我只是想向您展示另一种方法,并让您知道可以将其与函数式编程结合起来。

希望能帮助到你