如何等待RxJS内部订阅方法

apr*_*ity 13 async-await rxjs typescript angular

在RxJS主题的​​订阅回调中,我想要await一个async函数.下面是打字稿发布者抱怨说的代码示例:

错误:(131,21)TS2304:找不到名称'await'.

async ngOnInit() {
  this.subscriber = dateSubscription.subscribe((date: Date) => {
    let dbKey = await this._someService.saveToDatabase(someObject);
    // wait for db write to finish before evaluating the next code
    // ... some other code here
  });
}
Run Code Online (Sandbox Code Playgroud)

通常我在尝试在非async函数内调用await时会看到这个.我是否需要进行订阅回调async或者我是否会犯这个错误?该函数saveToDatabaseasync并返回一个解析为写入的数据库主键的promise.

小智 22

您可以直接将异步签名添加到subscribe中的匿名函数调用中

 this.subscriber = dateSubscription.subscribe(async (date: Date) => {
    let dbKey = await this._someService.saveToDatabase(someObject);
    // wait for db write to finish before evaluating the next code
    // ... some other code here
  });
Run Code Online (Sandbox Code Playgroud)

  • 为什么会这样?可观察的实现是否在`subscribe`中调用`await`?不适合我,会喜欢更多信息.. (4认同)
  • 这个答案是错误的。您可以在其中放置一个异步函数,但是它不会像您期望的那样运行。例如,当您执行Subject.next()通知可观察对象时,直到所有订阅都已执行,该调用才会完成。但是,如果您在其中放置了异步订阅,则对`next()`的调用将立即完成,`Subject.next()`将*不再等待*异步订阅。 (4认同)
  • @MgSam是完全正确的,这是难以捉摸的。在用户函数内使用`await'可能会带来意想不到的副作用,因为不能保证观察对象发出的值不再逐渐被处理。 (2认同)

max*_*992 9

您不需要使用await,也不需要将您转换PromiseObservable.


来自Ben Lesh的 CF 推文:

在此输入图像描述


这是一个模拟函数的例子saveToDatabase:(
和工作的Plunkr:https://plnkr.co/edit/7SDLvRS2aTw9gYWdIznS p = preview )

const { Observable } = Rx;

const saveToDatabase = (date) =>
  new Promise(resolve =>
    setTimeout(() =>
      resolve(`${date} has been saved to the database`),
      1000));

const date$ = Observable.of(new Date()).delay(1000);

date$
  .do(x => console.log(`date received, trying to save it to database ...`))
  .switchMap(date => saveToDatabase(date))
  .do(console.log)
  .subscribe();
Run Code Online (Sandbox Code Playgroud)

输出:
在此输入图像描述


Dav*_*ine 6

你不能await直接观察,但是你await可以Promise.

更新

该方法在 RxJS 8 中.toPromise()已被弃用lastValueFrom(),因此您可以使用or firstValueFrom()

import { lastValueFrom } from 'rxjs';
this.categories = await lastValueFrom(categories$);
Run Code Online (Sandbox Code Playgroud)

感谢罗伯特·伦德尔的评论

RxJS v8 之前的版本

.toPromise()您可以简单地在 observables 订阅上使用该方法。考虑以下:

async ngOnInit() {
  const date = await dateSubscription.toPromise();      
  let dbKey = await this._someService.saveToDatabase(someObject);
}
Run Code Online (Sandbox Code Playgroud)

当你await做出承诺时dateSubscription,你会得到一个Date物体。然后您可以继续执行下一行,这使得读取代码更加顺序。

有些人认为 Angular 不会等待完成ngOnInit,它没有选择。看看这里JavaScript给出的结果。正如您所看到的,将调用等待者,等待者在内部管理和执行底层状态机(生成器)。Angular 对此没有任何控制权。它只是希望调用该方法。TypeScript ngOnInit


wra*_*ord 6

2022 年的 Rxjs:使用pipeconcatMap

我最终来到这里寻找在收听下一个发布的项目之前等待处理昂贵项目的能力。我必须从处理程序async内部更改我的代码subscribe以使用 apipeconcatMap按照另一个类似的 SO 问题

旧代码,在处理新项目之前不会等待对项目的昂贵调用(因此在我的情况下竞争条件比比皆是):

myObservable$.subscribe(async (item) => { await expensiveFoo(item); })
Run Code Online (Sandbox Code Playgroud)

新代码,在处理下一项之前等待昂贵的函数

let subscription = myObservable$.pipe(
    concatMap(async (item) => { await expensiveFoo(item); })
).subscribe(
    (item) => simpleFoo(item),// can also handle sync code here if you want 
    (error) => handleError(error), 
    () => handleComplete()
);
Run Code Online (Sandbox Code Playgroud)

subscription请务必保留对(不是订阅者)的引用,直到您完成可观察的操作。


Bla*_*mba 5

这是我解决这个问题的方法

const title = await new Promise<string>(resolve => 
  this.translate.get('MYBOOK-PAGE.PAGE_TITLE')
   .subscribe(translated => {
     resolve(translated)
   }));
Run Code Online (Sandbox Code Playgroud)

在这里,我正在做的就是将我的Observable变为Promise

注意:这里唯一的问题是这是一次演出,即。如果您订阅一次,将无法再次访问它。适合我,所以在这里分享。

更新:一次 发现的轻松承诺只能toPromise在订阅者面前使用(博客)。因此,对于上述情况,它将是这样的:

const title = await this.translate.get('MYBOOK-PAGE.PAGE_TITLE').toPromise();

Run Code Online (Sandbox Code Playgroud)