如何在链接的 observables 之间传递结果

d0x*_*d0x 29 rxjs typescript angular rxjs-observables

抽象问题:每次源 Observable 发出和事件时,都需要触发一系列 API 调用和 Angular 服务。其中一些调用取决于以前的结果。

在我的示例中,源 ObservablestartUpload$触发了一系列依赖调用。

使用解构可以这样写:

this.startUploadEvent$.pipe(
      concatMap(event => this.getAuthenticationHeaders(event)),
      map(({ event, headers }) => this.generateUploadId(event, headers)),
      tap(({ event, headers, id }) => this.emitUploadStartEvent(id, event)),
      concatMap(({ event, headers, id }) => this.createPdfDocument(event, headers, id)),
      concatMap(({ event, headers, id, pdfId }) => this.uploadBilderForPdf(event, pdfId, headers, id)),
      mergeMap(({ event, headers, id, pdfId, cloudId }) => this.closePdf(cloudId, event, headers, id, pdfId)),
      tap(({ event, headers, id, pdfId, cloudId }) => this.emitUploadDoneEvent(id, event, cloudId)),
).subscribe()
Run Code Online (Sandbox Code Playgroud)

它几乎读起来像是一种命令式方法。但它有一些问题:

  • 解构链在代码中不断重复并变得越来越长 { event, headers, id, pdfId, cloudId }
  • 方法(如generateUploadId(event, headers))需要接收所有先前的值,以便它们能够将它们传递到下一个管道,即使方法本身不需要它
  • 内部 Observables(在方法中)需要映射这些值,以便进一步的管道阶段可以破坏它们:

_

private closePdf(cloudId, event, headers, id, pdfId) {
    return this.httpClient.post(..., { headers } )
        .pipe(
             //...,
             map(() => ({ event, headers, id, pdfId, cloudId }))
        )
}
Run Code Online (Sandbox Code Playgroud)

如果编译器可以处理样板(如 with async await)来编写这样的代码(没有上述问题),那就太好了:

private startUpload(event: StartUploadEvent) {
    const headers = this.getAuthenticationHeaders(event)
    const id = this.generateUploadId()

    this.emitUploadStartEvent(id, event)

    const pdfId = this.createPdfDocument(event, headers, id)
    this.uploadBilderForPdf(event, pdfId, headers, id)

    const cloudId = this.closePdf(headers, pdfId)
    this.emitUploadDoneEvent(id, event, cloudId)

    return cloudId
  }
Run Code Online (Sandbox Code Playgroud)

如何在没有我提到的问题的情况下在链接的 observables 之间传递结果?有没有我错过的 rxjs 概念?

Biz*_*Bob 14

你当然不应该让你的方法接受与它们无关的参数!

对于您的主要问题:

如何在没有我提到的问题的情况下在链接的 observables 之间传递结果?

使用单个作用域(嵌套管道)

下面的代码等效于您的示例代码,无需传递不必要的属性。先前返回的值可以通过函数调用在链中进一步访问:

1   startUploadEvent$.pipe(
2     concatMap(event => getAuthenticationHeaders(event).pipe(
3       map(headers => generateUploadId(event, headers).pipe(
4         tap(id => emitUploadStartEvent(id, event)),
5         concatMap(id => createPdfDocument(event, headers, id)),
6         concatMap(pdfId => uploadBilderForPdf(event, pdfId)),
7         tap(cloudId => closePdf(cloudId, event))
8       ))
9     ))
10  ).subscribe();
Run Code Online (Sandbox Code Playgroud)

注意下游如何访问eventheaders访问。它们不需要传递到不需要它们的函数中。

有没有我错过的 rxjs 概念?

也许。?并不真地... :-)

诀窍是添加一个.pipe有效地对运算符进行分组,以便他们都可以访问输入参数。

通常,我们尝试将代码平放在以下内容中.pipe

1   const greeting$ = userId$.pipe(
2     switchMap(id => http.get(`/users/${id}`)),
3     map(response => response.data.userName),
4     map(name => `Hello ${name}!`),
5     tap(greeting => console.log(greeting))
6   );
Run Code Online (Sandbox Code Playgroud)

但该代码实际上与以下内容没有什么不同:

1   const greeting$ = userId$.pipe(
2     switchMap(id => http.get(`/users/${id}`).pipe(
3       map(response => response.data.userName),
4       map(name => `Hello ${name}! (aka User #${id})`)
5     )),
6     tap(greeting => console.log(greeting))
7   );
Run Code Online (Sandbox Code Playgroud)

但是,在第二种情况下,第 4 行可以访问nameid,而在第一种情况下它只能访问name

注意第一个的签名是 userId$.pipe(switchMap(), map(), map(), tap())

第二个是:userId$.pipe(switchMap(), tap())


yur*_*zui 11

您的方法绝对不应与上下文耦合,也不应考虑将结果映射到特定形状。

RxJS 是关于函数式编程的。在函数式编程中,有一种模式,如Adapting Arguments to Parameters ref

它允许我们将方法签名与上下文分离。

为了实现这一点,您可以编写map, contentMap,mergMap运算符的上下文相关版本,以便最终解决方案如下所示:

this.startUploadEvent$.pipe(
      map(withKey('event')),
      concatMap_(({event}) => this.getAuthenticationHeaders(event), 'headers'),
      map_(({ headers }) => this.generateUploadId(headers), 'id'),
      tap(({ event, id }) => this.emitUploadStartEvent(id, event)),
      concatMap_(({ id }) => this.createPdfDocument(id), 'pdfId'),
      concatMap_(({ pdfId }) => this.uploadBuilderForPdf(pdfId), 'cloudId'),
      mergeMap_(({ cloudId }) => this.closePdf(cloudId)),
      tap(({id, event, cloudId}) => this.emitUploadDoneEvent(id, event, cloudId)),
    ).subscribe(console.log);
Run Code Online (Sandbox Code Playgroud)

请注意_在这些运算符之后。

Stackblitz 示例

这些自定义操作符的目标是通过投影函数将参数对象添加到原始参数对象中。

function map_<K extends string, P, V>(project: (params: P) => V): OperatorFunction<P, P>;
function map_<K extends string, P, V>(project: (params: P) => V, key: K): OperatorFunction<P, P & Record<K, V>>;
function map_<K extends string, P, V>(project: (params: P) => V, key?: K): OperatorFunction<P, P> {
  return map(gatherParams(project, key));
}

function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>): OperatorFunction<P, P>;
function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key: K): OperatorFunction<P, P & Record<K, V>>;
function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key?: K): OperatorFunction<P, P> {
  return concatMap(gatherParamsOperator(projection, key));
}

function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>): OperatorFunction<P, P>;
function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key: K): OperatorFunction<P, P & Record<K, V>>;
function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key?: K): OperatorFunction<P, P> {
  return mergeMap(gatherParamsOperator(projection, key));
}

// https://github.com/Microsoft/TypeScript/wiki/FAQ#why-am-i-getting-supplied-parameters-do-not-match-any-signature-error
function gatherParams<K extends string, P, V>(fn: (params: P) => V): (params: P) => P;
function gatherParams<K extends string, P, V>(fn: (params: P) => V, key: K): (params: P) => P & Record<K, V>;
function gatherParams<K extends string, P, V>(fn: (params: P) => V, key?: K): (params: P) => P {
  return (params: P) => {
    if (typeof key === 'string') {
      return Object.assign({}, params, { [key]: fn(params) } as Record<K, V>);
    }

    return params;
  };
}

function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>): (params: P) => Observable<P>;
function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>, key: K): (params: P) => Observable<P & Record<K, V>>;
function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>, key?: K): (params: P) => Observable<P> {
  return (params: P) => {
    return fn(params).pipe(map(value => gatherParams((_: P) => value, key)(params)));
  };
}

function withKey<K extends string, V>(key: K): (value: V) => Record<K, V> {
  return (value: V) => ({ [key]: value } as Record<K, V>);
}
Run Code Online (Sandbox Code Playgroud)

我在这里使用了函数重载,因为有时我们不需要为参数添加额外的键。参数应该只在this.closePdf(...)方法的情况下通过它。

因此,您将获得与之前具有类型安全性相同的解耦版本:

在此处输入图片说明

看起来不是过度设计吗?

在大多数情况下,您应该遵循YAGNI(您不需要它)原则。最好不要给现有代码增加更多的复杂性。对于这种情况,您应该坚持使用运算符之间共享参数的一些简单实现,如下所示:

ngOnInit() {
  const params: Partial<Params> = {};
  this.startUploadEvent$.pipe(
    concatMap(event => (params.event = event) && this.getAuthenticationHeaders(event)),
    map(headers => (params.headers = headers) && this.generateUploadId(headers)),
    tap(id => (params.uploadId = id) && this.emitUploadStartEvent(id, event)),
    concatMap(id => this.createPdfDocument(id)),
    concatMap(pdfId => (params.pdfId = pdfId) && this.uploadBuilderForPdf(pdfId)),
    mergeMap(cloudId => (params.cloudId = cloudId) && this.closePdf(cloudId)),
    tap(() => this.emitUploadDoneEvent(params.pdfId, params.cloudId, params.event)),
  ).subscribe(() => {
    console.log(params)
  });
Run Code Online (Sandbox Code Playgroud)

其中Params类型是:

interface Params {
  event: any;
  headers: any;
  uploadId: any;
  pdfId: any;
  cloudId: any;
}
Run Code Online (Sandbox Code Playgroud)

请注意我在作业中使用的括号(params.cloudId = cloudId)

Stackblitz 示例


还有很多其他方法,但它们需要改变您使用 rxjs 运算符的流程:


wlf*_*wlf 5

你可以:

  • 将每个动作的结果分配给一个 observable

  • 根据早期结果链接后续函数调用

  • 这些结果可以通过以下方式在以后的动作调用中重用 withLatestFrom

  • shareReplay用于防止后面的withLatestFrom订阅导致前面的函数重新执行

    function startUpload(event$: Observable<string>) {
      const headers$ = event$.pipe(
        concatMap(event => getAuthenticationHeaders(event)),
        shareReplay()
        );
    
      const id$ = headers$.pipe(
        map(() => generateUploadId()),
        shareReplay()
        );
    
      const emitUploadEvent$ = id$.pipe(
        withLatestFrom(event$),   // use earlier result
        map(([id, event]) => emitUploadStartEvent(id, event)),
        shareReplay()
        );
    
       // etc
    }
    
    Run Code Online (Sandbox Code Playgroud)

如上,函数只接受它们需要的参数,没有传递。

演示:https : //stackblitz.com/edit/so-rxjs-chaining-1?file=index.ts

这个模式可以通过使用 rxjs 自定义操作符来简化(注意这可以进一步改进,包括打字):

function call<T, R, TArgs extends any[], OArgs extends Observable<any>[]>(
  operator: (func: ((a: TArgs) => R)) => OperatorFunction<TArgs,R>,
  action: (...args: any[]) => R,
  ignoreInput: boolean,
  ...observableArgs: OArgs
): (args: Observable<T>) => Observable<R> {
  return (input: Observable<T>) => input.pipe(
    withLatestFrom(...observableArgs),
    operator((args: any[]) => action(...args.slice(ignoreInput ? 1: 0))),
    shareReplay(1)
  );
}
Run Code Online (Sandbox Code Playgroud)

可以像这样使用:

function startUpload(event$: Observable<string>) {
  const headers$ = event$.pipe(
    call(concatMap, getAuthenticationHeaders, true)
  );

  const id$ = headers$.pipe(
    call(map, generateUploadId, false)
  );

  const startEmitted$ = id$.pipe(
    call(map, emitUploadStartEvent, true, event$)
  );

  const pdfId$ = startEmitted$.pipe(
    call(map, createPdfDocument, false, event$, headers$, id$)
  );

  const uploaded$ = pdfId$.pipe(
    call(map, uploadBuilderForPdf, false, event$, pdfId$, headers$, id$)
  );

  const cloudId$ = uploaded$.pipe(
    call(map, closePdf, false, headers$, pdfId$)
  );

  const uploadDone$ = cloudId$.pipe(
    call(map, emitUploadDoneEvent, true, id$, event$)
  );

  // return cloudId$ instead of uploadDone$ but preserve observable chain
  return uploadDone$.pipe(concatMap(() => cloudId$));    
}
Run Code Online (Sandbox Code Playgroud)

演示:https : //stackblitz.com/edit/so-rxjs-chaining-4?file=index.ts