rxjs 如何完成 observable?

Sve*_*art 3 javascript rxjs typescript ecmascript-6 angular

要学习 rxjs,我正在玩它。我的代码:

// RxJS v6+
import { withLatestFrom, map } from 'rxjs/operators';
import { interval } from 'rxjs';

const source = interval(1000);

const example = source.pipe(
  map(value => value +1),
  map(value => {
    if(value === 40) {
      finish();
    }
    else if (value % 5 === 0){
      return 'can devide by 5 we did some magic';
  }else{
     return value;
  } })
);
const subscribe = example.subscribe(
  val => console.log(val), 
  error => console.log("Error handled: " , error), 
  () => console.log('resolved'));
Run Code Online (Sandbox Code Playgroud)

我的想法是运行它 40 次,然后完成 observable(这可能是另一个要求,例如查看 10:00 的值是否为 10(主要目标是对值进行评估并强制完成))。我正在寻找占位符 finish() 的替代方案,因为 Finish 不存在。我怎样才能() => console.log('resolved')进入 subscribe 方法的解析功能?

我发现如何在 RxJS 中完成 Observable但答案是从 2015 年开始的,我假设现在有当前 rxjs 版本的答案。

Ton*_*Ngo 7

Acuatally 仍然是相同的,您只需要使用管道运算符。您可以在此处查看示例

import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

const source = interval(1000);
const timer$ = timer(5000);
const example = source.pipe(takeUntil(timer$));
const subscribe = example.subscribe(val => console.log(val));
Run Code Online (Sandbox Code Playgroud)


mal*_*awi 3

两个答案都提到 takeuntil 和 take 是正确的,但另一种方法是使用订阅对象取消订阅它只是另一个选项

const subx= example.subscribe(val =>  { 
   console.log(val); 
   if (val == 40) {
    subx.unsubscribe() 
   }
 });
Run Code Online (Sandbox Code Playgroud)

演示

更新

如果您有很多订阅者,并且您想添加完成源可观察的条件,则操作符可以在这里完成这项工作

const source = interval(1000).pipe(take(5)); // 

source.pipe(map(res => res * 10)).subscribe(val => {
  console.log("", val);
});

source.subscribe(val => {
  console.log(val);
});
Run Code Online (Sandbox Code Playgroud)

演示