标签: rxjs6

RXJS 出错后继续 concat subscribe

我有一系列需要按顺序触发的 observable。一旦发生错误,我需要捕获错误,记录它并继续观察。

目前,一旦发生错误,观察者就会停止。观察者必须继续并且不会因错误而重新启动或完成。

import * as Rx from "rxjs";
const source = [
  Rx.Observable.from("1").delay(200),
  Rx.Observable.from("2").delay(150),
  Rx.Observable.throw("error"),
  Rx.Observable.from("3").delay(124),
  Rx.Observable.from("4").delay(201),
];
let sSource = Rx.Observable.concat(...source);
sSource.subscribe((v) => {console.log(v)}, (e) => {console.log(e)});
Run Code Online (Sandbox Code Playgroud)

电流输出:

1
2
error
Run Code Online (Sandbox Code Playgroud)

预期输出:

1
2
error
3
4
Run Code Online (Sandbox Code Playgroud)

我们能想出的唯一解决方案是预先循环遍历sourceobservable 并单独向它们添加 catch 处理程序,然后一旦发生错误,它就会得到正确处理,观察者可以继续操作,而无需完成整个串联的 observable。

我们觉得应该有一个更优雅的解决方案。如果需要,我会发布我们目前的解决方案。

rxjs typescript rxjs5 rxjs6

8
推荐指数
1
解决办法
4836
查看次数

如何在 rxjs 6.4.0 Angular 中替换 flatMap 和 mergeMap

我的 Angular 7 应用程序中有一个拦截器,它在重新发出请求之前发出获取令牌的请求。如果同时出现多个请求,那么它们都会发出获取令牌的请求。为了避免这种情况,我共享一个可观察量,以便共享获取令牌的结果,并且只发出一个请求来获取令牌。

我创建共享可观察量如下

this.authService.sharedToken =   this.authService.getToken().pipe(share());
Run Code Online (Sandbox Code Playgroud)

然后我提出请求

 return auth.sharedToken.flatMap((res) => {

     auth.saveTokenToLocalStorage(res);
     return this.getRequestWithAuthentication(request, next, auth);

 }).catch(function (err) {// I handle errors here
 }
Run Code Online (Sandbox Code Playgroud)

问题是 flatMap 已被弃用,用 mergeMap 替换它也不起作用。看来 mergeMap 现在是一个独立的函数。那么我怎样才能让上面的代码工作呢?

我正在使用 rxjs 6.4.0 和 Angular 7.2.4

谢谢。

编辑:

使用新的管道方法我有以下内容:

  return auth.sharedToken.pipe(
            mergeMap((res) => {

                auth.saveTokenToLocalStorage(res);
                return this.getRequestWithAuthentication(request, next, auth);
            }), catchError(function (err) {
                console.log("failed to get token")
                return EMPTY;
            }));
Run Code Online (Sandbox Code Playgroud)

当请求失败时,我无法打印“无法获取令牌”。我在那里做了更多的错误处理,所以我需要在请求失败时触发一些代码。

rxjs angular rxjs6

8
推荐指数
1
解决办法
1万
查看次数

如何获得"Observable.of([]);" 上班?

什么是正确的表达和导入Observable.of([]);.

import { of } from 'rxjs'; 不适合我.

rxjs typescript rxjs5 angular rxjs6

7
推荐指数
1
解决办法
6208
查看次数

Angular 6:属性'of'在类型'typeof Observable'上不存在

我正在使用Angular 6 使用"rxjs":"^ 6.0.0",

错误:属性'of'在类型'typeof Observable'上不存在.

import { Injectable } from '@angular/core';
import { TranslateLoader } from '@ngx-translate/core';
import { Observable, Subject, pipe, of } from 'rxjs';


@Injectable()
export class MiTranslateLoaderService implements TranslateLoader {

  getTranslation(lang: string): Observable<any> {
    return Observable.of({
      lbl_select: 'Select',
    });
  }
}
Run Code Online (Sandbox Code Playgroud)

rxjs angular6 rxjs6

7
推荐指数
2
解决办法
1万
查看次数

BehaviorSubject映射一个BehaviorSubject

我有一个rxjs @ 6 BehaviorSubject source$,我想从中获取子值source$

  const source$ = new BehaviorSubject(someValue);
  const subSource$ = source$.pipe(map(transform));
Run Code Online (Sandbox Code Playgroud)

我期望the subSource$也是一个BehaviorSubject,但不是,我如何才能获得subSource$一个BehaviorSubject?

rxjs rxjs6

7
推荐指数
1
解决办法
1227
查看次数

类型 'Observable&lt;HttpEvent&lt;&gt;&gt;' 不可分配给类型 'Observable&lt;&gt;'

我有这个代码片段:

SubmitTransaction(transNumber: string, transactionRequest: ITransactionRequestObj): Observable<TransactionResponse> {
    this.body =  JSON.stringify(transactionRequest);
    this.headers = new HttpHeaders().set('Content-Type', 'application/json');
    this.headers.append('Accept', 'application/json');
    this.options = new RequestOptions({headers: this.headers});
    return this.http.post<TransactionResponse>(this.baseUrl + '/transactions/' + transNumber + '/new',  this.body, this.options)
   .pipe(catchError((e) => this.errorHandler(e)));
  }
Run Code Online (Sandbox Code Playgroud)

我所做的就是将我的项目从 Angular 5 升级到 Angular 6 并更改.catch((e) => this.errorHandler(e));.pipe(catchError((e) => this.errorHandler(e)));. 但是,对于此特定方法,我收到以下 TypeScript 错误。

错误:

[ts]
Type 'Observable<HttpEvent<TransactionResponse>>' is not assignable to type 'Observable<TransactionResponse>'.
  Type 'HttpEvent<TransactionResponse>' is not assignable to type 'TransactionResponse'.
    Type 'HttpSentEvent' is not assignable to type 'TransactionResponse'. …
Run Code Online (Sandbox Code Playgroud)

typescript angular angular6 rxjs6

7
推荐指数
1
解决办法
6693
查看次数

distinctUntilChanged 设置初始值

对于 rxjs,我可以在使用时提供初始值distinctUntilChanged吗?当没有收到以前的值时,似乎完全跳过了等于检查。当它收到的第一个值是初始值时,我不想发出任何内容。


用例

就我而言,我正在使用 angular,我正在使用反应式形式并订阅其值更改。由于它是一个输入字段,我有一个典型的管道设置:

  • debounceTime(350)
  • distinctUntilChanged()

当用户将初始字段从空更改为某些内容然后在去抖时间内再次为空时,无论如何都会触发该事件,这不应该发生。


我试过的

  • 提供一个自定义的 equals 方法 distinctUntilChanged
    • 不工作,因为管道在第一次遇到时被完全跳过
  • next使用默认值对基础主题 强制 a
    • 这仍然会在我宁愿避免的去抖时间之后发出一个值

distinct-values observable rxjs6

7
推荐指数
2
解决办法
3354
查看次数

RxJs 中的“new Observable()”和“of()”有什么区别

之间有什么区别new Observable()of()RxJs

在我的测试用例中,当我尝试返回new Observable()它时,它给了我一个有线错误,如果我用of()Rxjs替换它,它工作正常。

我的印象是Observable.create()new Observable()并且of()做同样的事情。

someService.getMethod().pipe(
  ...
  catchError((e) => {
    // some operation
    // return new Observable(false as any);  // ----> creates error
    return of(false as any); // this works perfectly
  })
)
Run Code Online (Sandbox Code Playgroud)

rxjs angular rxjs6

7
推荐指数
1
解决办法
4834
查看次数

暂停/恢复计时器 Observable

我正在使用 angular/rxjs6 构建一个简单的秒表,我可以启动计时器,但无法暂停/恢复它。

  source: Observable<number>;
  subscribe: Subscription;

  start() {
    this.source = timer(0, 1000);
    this.subscribe = this.source
      .subscribe(number => {
        this.toSeconds = number % 60;
        this.toMinutes = Math.floor(number / 60);
        this.toHours = Math.floor(number / (60 * 60));

        this.seconds = (this.toSeconds < 10 ? '0' : '') + this.toSeconds;
        this.minutes = (this.toMinutes < 10 ? '0' : '') + this.toMinutes;
        this.hours = (this.toHours < 10 ? '0' : '') + this.toHours;
    });
  }

  pause() {
    this.subscribe.unsubscribe(); // not working
  }
Run Code Online (Sandbox Code Playgroud)

经过大量搜索后,我发现我应该使用switchMap …

angular angular6 rxjs6

7
推荐指数
1
解决办法
5863
查看次数

How to understand complicated typescript generic types?

I was checking RxJS api documents. I always face problem in understanding the weird syntax written on the top, Which is like below as an example:

combineLatest<O extends ObservableInput<any>, R>(...observables: (O | ((...values: ObservedValueOf<O>[]) => R) | SchedulerLike)[]): Observable<R>
Run Code Online (Sandbox Code Playgroud)

I wanted to understand this kind of syntax and further write it by my own.

So, It will be great help if anyone explain what's going on in above example.

typescript reactjs typescript-generics angular rxjs6

7
推荐指数
1
解决办法
149
查看次数