如何交错流(带背压)

pha*_*dej 10 javascript rxjs bacon.js

假设我有两个可能无限的流:

s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...
Run Code Online (Sandbox Code Playgroud)

我想合并流,然后用慢速异步操作映射合并流(例如在Bacon with fromPromiseflatMapConcat).

我可以将它们与merge:

me = a12b3.c45d6.7e...
Run Code Online (Sandbox Code Playgroud)

然后映射

s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...
me = a12b3.c45d6.7e...
mm = a..1..2..b..3..c..4..5..
Run Code Online (Sandbox Code Playgroud)

如你所见,从长远来看,贪婪的 s2流可以获得优势.这是不受欢迎的行为.


合并行为也不行,因为我想有某种背压有更多的交错,"公平","循环"合并.几个期望的行为的例子:

s1 = a.....b..............c...
s2 = ..1.2.3..................
mm = a...1...b...2...3....c...

s1 = a.........b..........c...
s2 = ..1.2.3..................
mm = a...1...2...b...3....c...
Run Code Online (Sandbox Code Playgroud)

认为这是一种方法s1,s2并将任务发送给工作人员,当时只能处理一项任务.有了merge,flatMapConcat我会得到一个贪婪的任务经理,但我想要更公平的.


我想找一个简单而优雅的解决方案.如果它对任意数量的流很容易通用,那就太好了:

// roundRobinPromiseMap(streams: [Stream a], f: a -> Promise b): Stream b
var mm = roundRobinPromiseMap([s1, s2], slowAsyncFunc);
Run Code Online (Sandbox Code Playgroud)

使用RxJS或其他Rx库的解决方案也很好.


澄清

不是zipAsArray

我不想要:

function roundRobinPromiseMap(streams, f) {
  return Bacon.zipAsArray.apply(null, streams)
    .flatMap(Bacon.fromArray)
    .flatMapConcat(function (x) {
      return Bacon.fromPromise(f(x));
    });
}
Run Code Online (Sandbox Code Playgroud)

比较示例大理石图:

s1  = a.....b..............c.......
s2  = ..1.2.3......................
mm  = a...1...b...2...3....c....... // wanted
zip = a...1...b...2........c...3... // zipAsArray based
Run Code Online (Sandbox Code Playgroud)

是的,我会遇到缓冲问题

......但我也会用直截了当的不公平的方式:

function greedyPromiseMap(streams, f) {
  Bacon.mergeAll(streams).flatMapConcat(function (x) {
    return Bacon.fromPromise(f(x));
  });
}
Run Code Online (Sandbox Code Playgroud)

大理石图

s1    = a.........b..........c...
s2    = ..1.2.3..................
mm    = a...1...2...b...3....c...
merge = a...1...2...3...b....c...
Run Code Online (Sandbox Code Playgroud)

pha*_*dej 2

这里的核心挑战是理解如何形式化公平。在问题中我已经提到了工人的类比。事实证明,明显的公平标准是选择一个比其他流生成更少事件的流,或者更进一步:谁生成的流等待的时间更少。

\n\n

之后,使用指称语义形式化所需的输出就变得非常简单:\n代码位于 GitHub 上

\n\n

我没有时间开发包含 Bacon.js 的指称组合器withStateMachine因此下一步是直接使用Bacon.js在 JavaScript 中重新实现它。整个可运行的解决方案可作为要点提供

\n\n

这个想法是制作一个状态机

\n\n
    \n
  • 每个流的成本和队列作为一个状态
  • \n
  • 流和附加反馈流作为输入
  • \n
\n\n

当整个系统的输出被反馈时,我们可以在前一个 flatMapped 流结束时使下一个事件出队。

\n\n

为此我不得不制作一个有点丑陋的rec组合器

\n\n
function rec(f) {\n  var bus = new Bacon.Bus();\n  var result = f(bus);\n  bus.plug(result);\n  return result;\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

它的类型是(EventStream a -> EventStream a) -> EventStream a- 该类型类似于其他递归组合器,例如fix

\n\n

它可以通过更好的系统范围行为来实现,因为总线会中断取消订阅传播。我们必须为此努力。

\n\n

第二个辅助函数是stateMachine,它接受一组流并将它们转换为单个状态机。本质上是.withStateMachine \xe2\x88\x98 mergeAll \xe2\x88\x98 zipWithIndex.

\n\n
function stateMachine(inputs, initState, f) {\n  var mapped = inputs.map(function (input, i) {\n    return input.map(function (x) {\n      return [i, x];\n    })\n  });\n  return Bacon.mergeAll(mapped).withStateMachine(initState, function (state, p) {\n    if (p.hasValue()) {\n      p = p.value();\n      return f(state, p[0], p[1]);\n    } else {\n      return [state, p];\n    }\n  });\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

使用这两个助手,我们可以编写一个不太复杂的公平调度程序

\n\n
function fairScheduler(streams, fn) {\n  var streamsCount = streams.length;\n  return rec(function (res) {\n    return stateMachine(append(streams, res), initialFairState(streamsCount), function (state, i, x) {\n      // console.log("FAIR: " + JSON.stringify(state), i, x);\n\n      // END event\n      if (i == streamsCount && x.end) {\n        var additionalCost = new Date().getTime() - x.started;\n\n        // add cost to input stream cost center\n        var updatedState = _.extend({}, state, {\n          costs: updateArray(\n            state.costs,\n            x.idx, function (cost) { return cost + additionalCost; }),\n        });\n\n        if (state.queues.every(function (q) { return q.length === 0; })) {\n          // if queues are empty, set running: false and don\'t emit any events\n          return [_.extend({}, updatedState, { running: false }), []];\n        } else {\n          // otherwise pick a stream with\n          // - non-empty queue\n          // - minimal cost\n          var minQueueIdx = _.chain(state.queues)\n            .map(function (q, i) {\n              return [q, i];\n            })\n            .filter(function (p) {\n              return p[0].length !== 0;\n            })\n            .sortBy(function (p) {\n              return state.costs[p[1]];\n            })\n            .value()[0][1];\n\n          // emit an event from that stream\n          return [\n            _.extend({}, updatedState, {\n              queues: updateArray(state.queues, minQueueIdx, function (q) { return q.slice(1); }),\n              running: true,\n            }),\n            [new Bacon.Next({\n              value: state.queues[minQueueIdx][0],\n              idx: minQueueIdx,\n            })],\n          ];\n        }\n      } else if (i < streamsCount) {\n        // event from input stream\n        if (state.running) {\n          // if worker is running, just enquee the event\n          return [\n            _.extend({}, state, {\n              queues: updateArray(state.queues, i, function (q) { return q .concat([x]); }),\n            }),\n            [],\n          ];\n        } else {\n          // if worker isn\'t running, start it right away\n          return [\n            _.extend({}, state, {\n              running: true,\n            }),\n            [new Bacon.Next({ value: x, idx: i})],\n          ]\n        }\n      } else {\n        return [state, []];\n      }\n\n    })\n    .flatMapConcat(function (x) {\n      // map passed thru events,\n      // and append special "end" event\n      return fn(x).concat(Bacon.once({\n        end: true,\n        idx: x.idx,\n        started: new Date().getTime(),\n      }));\n    });\n  })\n  .filter(function (x) {\n    // filter out END events\n    return !x.end;\n  })\n  .map(".value"); // and return only value field\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

要点中的其余代码非常简单。

\n