Rxjs Subject#next asynchronous or not?

use*_*188 2 rxjs typescript

Hi I wonder if the following code does execute in sequence? i.e. whether the logging "Worker has finished task C" does always happen after the logging "Finished 3 task(s)"?

Long question: with the scan operator, I can be sure the tasks are executed in sequence, that I am not worried. What worries me is I want from the last subscribe to do something only after task C has completed and I am not sure if the position of where o.complete() is put guarantees that. e.g. will start() runs do.("A") -> do.("B") -> do.("C") without waiting for the scans to complete and runs o.complete() straight away, giving an output:

Worker has finished task C
Doing task A
Finished 1 task(s)
Doing task B
Finished 2 task(s)
Doing task C
Finished 3 task(s)
Run Code Online (Sandbox Code Playgroud)

If that can be the case, how do you fix the code so that achieves what I have described?

https://stackblitz.com/edit/typescript-xhhwme

class Worker {
  private tasks: Subject<string>;
  public init(): Observable<number> {
    this.tasks = new Subject<string>();
    return this.tasks.scan((count, task) => {
      console.log("Doing task " + task);
      return ++count;
    }, 0).asObservable();
  }
  public do(task: string): void {
    this.tasks.next(task);
  }
}

function start(worker: Worker): Observable<void> {
  return Observable.create(o => {
    const monitor = worker.init();
    monitor.subscribe(c => console.log("Finished " + c + " task(s)"));
    worker.do("A");
    worker.do("B");
    worker.do("C");
    o.complete();
    worker.do("D");
  });
}

const worker = new Worker();
start(worker).subscribe({
  complete: () => console.log("Worker has finished task C")
});
Run Code Online (Sandbox Code Playgroud)

a b*_*ver 7

TLDR:Subject.next是同步的。

如果源是同步的,则反应性流是同步的,除非您明确地使它们异步或将它们与异步流混合。这些都不会在您的代码中发生。一些例子:

//Synchronous
of(1,2)
  .subscribe(console.log);

//asynchronous because of async source
interval(1000)
  .subscribe(console.log);

//aynchronous because one stream is async (interval)
of(1,2)
  .pipe(
    mergeMap(x => interval(1000).pipe(take(2)))
  )
  .subscribe(console.log);

//async because we make it async
of(1,2, asyncScheduler)
  .subscribe(console.log);
Run Code Online (Sandbox Code Playgroud)

您的示例发生了什么?内部的所有内容都Observable.create将立即执行。当你调用worker.do("A");然后this.tasks.next(task);发出一个新的值,tasks在执行流链(同步)。B和也会发生相同的情况C

调用时o.complete();start(worker)流完成并被"Worker has finished task C"打印。然后Dtasks流执行。

您可以在以下文章中找到有关异步/同步行为的更多详细信息:

  • 这是 SO 答案应该是什么的完美示例。不仅解决了问题,还提供了相关的进修资料。您可以成为真正的好老师,谢谢! (4认同)