Observable 内部的异步/等待

Mug*_*tsu 7 javascript observable rxjs

如何在 Observable 中使用 async/await ?使用此代码,我无法在可观察范围内触发取消订阅功能,因此间隔未被清除。

const { Observable } = require("rxjs");

const test = () => new Observable(async (subscriber) => {
  await Promise.resolve();

  const a = setInterval(() => {
    subscriber.next(Math.random());
    console.log("zz");
  }, 500);

  return () => {
    console.log("asdsad");
    clearInterval(a);
  };
});

const xyz = test().subscribe(console.log);


setTimeout(() => {
  xyz.unsubscribe();
}, 3000);
Run Code Online (Sandbox Code Playgroud)

Coo*_*ott 5

不支持可观察对象内的异步/等待。但是,可以通过行为主体和异步嵌套函数来完成。

创建一个行为主体,将其转换为可观察对象(.asObservable()),执行异步嵌套函数,返回可观察对象。这是一个例子。

function getProgress() {

    // Change this value with latest details
    const value = new BehaviorSubject('10%');
    const observable = value.asObservable();

    // Create an async function
    const observer = async() => {
      // Perform all tasks in here
      const wait1 = await new Promise(resolve => setTimeout(resolve, 3000));
      value.next('66%');

      const wait2 = await new Promise(resolve => setTimeout(resolve, 3000));
      value.next('100%');

      // Complete observable
      value.complete();
    }

    // Call async function & return observable
    observer();
    return observable;
  }
Run Code Online (Sandbox Code Playgroud)

它非常具有可读性,而且很有魅力。


Rem*_*iak 2

首先,传递给可观察构造函数的订阅者不能是异步函数。对此没有任何支持。

如果您需要从 Promise 创建 Observable,请使用from

import { from } from 'rxjs';
const observable = from(promise);
Run Code Online (Sandbox Code Playgroud)

但考虑到你的情况。

因为无法取消原生 js 承诺,所以您无法真正取消订阅此类创建的可观察量,因此:

const obs = from(new Promise(resolve => {
  setTimeout(() => {
    console.log('gonna resolve');
    resolve('foo');
  }, 1000);
}));

const sub = obs.subscribe(console.log);
setTimeout(() => sub.unsubscribe(), 500);
Run Code Online (Sandbox Code Playgroud)

将打印:

gonna resolve
gonna resolve
gonna resolve
(...)
Run Code Online (Sandbox Code Playgroud)

所以是的:gonna resolve将一直在控制台中打印,但仅此而已 - 传递给解析的结果将被忽略- 只是不记录。

另一方面,如果setTimeout(() => sub.unsubscribe(), 500);这次删除取消订阅 ( ),您将看到:

gonna resolve
foo
gonna resolve
gonna resolve
gonna resolve
(...)
Run Code Online (Sandbox Code Playgroud)

有一种方法可能会对您有所帮助 -defer但它与您的问题并不严格相关。

import { defer } from 'rxjs';

defer(async () => {
  const a = await Promise.resolve(1);
  const b = a + await Promise.resolve(2);

  return a + b + await Promise.resolve(3);
}).subscribe(x => console.log(x)) // logs 7
Run Code Online (Sandbox Code Playgroud)