如何让 RxJS Observable 在特定日期时间发出?

Ale*_*ers 8 javascript rxjs

我有一个RxJS Observable需要在特定时间重新计算,如DateTime对象数组所描述的(尽管出于这个问题的目的,它们可以是 JavaScriptDate对象、纪元毫秒或任何其他代表特定时刻的东西):

const changeTimes = [
    //            yyyy, mm, dd, hh, mm
    DateTime.utc( 2018, 10, 31, 21, 45 ),
    DateTime.utc( 2018, 10, 31, 21, 50 ),
    DateTime.utc( 2018, 10, 31, 22, 00 ),
    DateTime.utc( 2018, 10, 31, 23, 00 ),
    DateTime.utc( 2018, 10, 31, 23, 30 ),
];
Run Code Online (Sandbox Code Playgroud)

我正在努力理解如何创建一个可在此类数组中指定的时间发出的 Observable。

为了回答我自己的问题,这是我的想法:

  • 我几乎可以肯定需要使用delay运营商在指定的延迟时间是“现在”和下一个将来的日期时间之间的时间。
  • 不知何故,我需要确保“现在”是当前在认购时,不能以可观察到的创建,可能是时间使用defer运营商-尽管我不想不必要地创建多个可观测情况下,如果有多个订阅。
  • 我不能确定如何随着时间的passes-遍历数组expand操作可能是我所需要的,但它调用的东西递归,我只是想遍历一个列表。
  • timer操作似乎是无关紧要的,因为每个日期时间之间的时间长短不同。
  • 我可以将每个日期时间映射到它自己的延迟 Observable 并通过 返回它们merge,但是随着数组中日期时间的数量增加(可能有数百个),这变得非常低效,所以这绝对是最后的手段。

我怎样才能制作一个 RxJS Observable,它接受一个日期时间列表,然后在每个时间到达时发出,在最后一个完成?

Ale*_*ers 1

给定一个对象数组DateTime

\n\n
const changeTimes = [\n    //            yyyy, mm, dd, hh, mm\n    DateTime.utc( 2018, 10, 31, 21, 45 ),\n    DateTime.utc( 2018, 10, 31, 21, 50 ),\n    DateTime.utc( 2018, 10, 31, 22, 00 ),\n    DateTime.utc( 2018, 10, 31, 23, 00 ),\n    DateTime.utc( 2018, 10, 31, 23, 30 ),\n];\n
Run Code Online (Sandbox Code Playgroud)\n\n

或者更好的是,每次数组更改时,Observable 都会发出一个数组(这就是我的场景中实际发生的情况,尽管我没有在问题中提及它,因为它并不严格相关):

\n\n
const changeTimes$: Observable<DateTime[]> = /* ... */;\n
Run Code Online (Sandbox Code Playgroud)\n\n

下面的 Observable 将立即在订阅时发出下一个未来时间,在前一个未来时间过去时发出每个后续的未来时间,然后完成null

\n\n
const nextTime$ = changeTimes$.pipe(\n    // sort DateTimes chronologically\n    map(unsorted => [...unsorted].sort((x, y) => +x - +y),\n    // remove duplicates\n    map(duplicated => duplicated.filter((item, i) => !i || +item !== +duplicated[i - 1])),\n    // convert each time to a delayed Observable\n    map(times => [...times, null].map((time, i) => defer(() => of(time).pipe(\n        // emit the first one immediately\n        // emit all others at the previously emitted time\n        delay(i === 0 ? 0 : +times[i - 1] - +DateTime.utc())\n    )))),\n    // combine into a single Observable\n    switchMap(observables => concat(...observables)),\n);\n
Run Code Online (Sandbox Code Playgroud)\n\n
    \n
  • 排序是必要的,因为每个内部 Observable(\xe2\x80\x9cwait X 毫秒,然后报告时间\xe2\x80\x9d)在前一个完成时被订阅。
  • \n
  • 删除重复项并不是严格必要的,但似乎适合满足需要。
  • \n
  • defer以便在订阅内部 Observable 时计算当前时间。
  • \n
  • concat用于连续执行每个内部 Observable(感谢martin),避免了列表中每次同时订阅的开销。
  • \n
\n\n

这样就满足了我最初的需求:

\n\n
\n

我有一个 RxJS Observable 需要在特定时间重新计算,如 DateTime 对象数组所描述的......

\n
\n\n

是如果我将其与需要使用运算重新计算的数据结合起来combineLatest以在正确的时间触发重新计算:

\n\n
const timeAwareData$ = combineLatest(timeUnawareData$, nextTime$).pipe(\n    tap(() => console.log(\'either the data has changed or a time has been reached\')),\n    // ...\n);\n
Run Code Online (Sandbox Code Playgroud)\n\n

我对我的解决方案不满意的地方

\n\n

它为列表中的每个时间同时创建一个单独的内部 Observable。我觉得可以重构,使得每个内部 Observable 仅在前一个 Observable 被销毁后才创建。任何改进建议将不胜感激。

\n