Hi I wonder if the following code does execute in sequence? i.e. whether the logging "Worker has finished task C" does always happen after the logging "Finished 3 task(s)"?
Long question: with the scan operator, I can be sure the tasks are executed in sequence, that I am not worried. What worries me is I want from the last subscribe to do something only after task C has completed and I am not sure if the position of where o.complete() is put guarantees that. e.g. will start() runs do.("A") -> do.("B") -> do.("C") without waiting for the scans to complete and runs o.complete() straight away, giving an output:
Worker has finished task C
Doing task A
Finished 1 task(s)
Doing task B
Finished 2 task(s)
Doing task C
Finished 3 task(s)
Run Code Online (Sandbox Code Playgroud)
If that can be the case, how do you fix the code so that achieves what I have described?
https://stackblitz.com/edit/typescript-xhhwme
class Worker {
private tasks: Subject<string>;
public init(): Observable<number> {
this.tasks = new Subject<string>();
return this.tasks.scan((count, task) => {
console.log("Doing task " + task);
return ++count;
}, 0).asObservable();
}
public do(task: string): void {
this.tasks.next(task);
}
}
function start(worker: Worker): Observable<void> {
return Observable.create(o => {
const monitor = worker.init();
monitor.subscribe(c => console.log("Finished " + c + " task(s)"));
worker.do("A");
worker.do("B");
worker.do("C");
o.complete();
worker.do("D");
});
}
const worker = new Worker();
start(worker).subscribe({
complete: () => console.log("Worker has finished task C")
});
Run Code Online (Sandbox Code Playgroud)
TLDR:Subject.next是同步的。
如果源是同步的,则反应性流是同步的,除非您明确地使它们异步或将它们与异步流混合。这些都不会在您的代码中发生。一些例子:
//Synchronous
of(1,2)
.subscribe(console.log);
//asynchronous because of async source
interval(1000)
.subscribe(console.log);
//aynchronous because one stream is async (interval)
of(1,2)
.pipe(
mergeMap(x => interval(1000).pipe(take(2)))
)
.subscribe(console.log);
//async because we make it async
of(1,2, asyncScheduler)
.subscribe(console.log);
Run Code Online (Sandbox Code Playgroud)
您的示例发生了什么?内部的所有内容都Observable.create将立即执行。当你调用worker.do("A");然后this.tasks.next(task);发出一个新的值,tasks在执行流链(同步)。B和也会发生相同的情况C。
调用时o.complete();,start(worker)流完成并被"Worker has finished task C"打印。然后D由tasks流执行。
您可以在以下文章中找到有关异步/同步行为的更多详细信息:
| 归档时间: |
|
| 查看次数: |
786 次 |
| 最近记录: |