递归可观察

Ele*_*erm 4 rxjs reactivex

我正在使用RxJ,必须建立一种轮询机制才能从服务器检索更新。

我需要每秒发出一个请求,解析更新,发出请求并记住它的ID,因为我需要它来请求下一个更新包,例如getUpdate(lastId + 1)

第一部分是容易的,所以我只是用intervalmergeMap

let lastId = 0
const updates = Rx.Observable.interval(1000)
    .map(() => lastId)
    .mergeMap((offset) => getUpdates(offset + 1))
Run Code Online (Sandbox Code Playgroud)

我正在收集这样的标识符:

updates.pluck('update_id').scan(Math.max, 0).subscribe(val => lastId = val)
Run Code Online (Sandbox Code Playgroud)

但是这种解决方案不是纯粹的反应性的,我正在寻找一种方法来省略“全局”变量的使用。

我如何改进代码,同时仍然能够返回仅包含调用者更新的Observable?

UPD。

服务器对getUpdates(id)的响应如下所示:

[
  { update_id: 1, payload: { ... } },
  { update_id: 3, payload: { ... } },
  { update_id: 2, payload: { ... } }
]
Run Code Online (Sandbox Code Playgroud)

它可以按任何顺序包含0到Infinity更新

Ing*_*ürk 5

像这样吗 注意,这是一个无限的流,因为没有任何条件可以中止。你没有给。

// Just returns the ID as the update_id.
const fakeResponse = id => {
  return [{ update_id: id }];
};

// Fakes the actual HTTP call with a network delay.
const getUpdates = id => Rx.Observable.of(null).delay(250).map(() => fakeResponse(id));

// Start with update_id = 0, then recursively call with the last
// returned ID incremented by 1.
// The actual emissions on this stream will be the full server responses.
const updates$ = getUpdates(0)
  .expand(response => Rx.Observable.of(null)
    .delay(1000)
    .switchMap(() => {
      const highestId = Math.max(...response.map(update => update.update_id));
      return getUpdates(highestId + 1);
    })
  )

updates$.take(5).subscribe(console.log);
Run Code Online (Sandbox Code Playgroud)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>
Run Code Online (Sandbox Code Playgroud)

要定义流的终止,您可能想switchMap在最后插入钩子。使用的任何属性response有条件地返回Observable.empty()而不是getUpdates再次调用。