RxJs从结果Promise中创建Observable

Max*_*101 12 javascript rxjs angularjs

我是RxJs的新手,我想了解将Rx与Promises结合使用的最佳方法.

我想要创建的是Angular中的一项服务,它充当事件调度程序模式,并在承诺完成后发出事件.我还要求的是,如果没有(事件)订阅者,则observable永远不会被调用.我想要发生的最后一件事是,observable的任何后续订阅者获得相同的结果而不会触发对服务器的另一个请求.我已经设法在这里实现我自己的解决方案:

// ... CountryService code

var COUNTRIES_LOADED = Rx.Observable
    .create(function (observer) {
        $http
            .get('/countries')
            .then(function (res) {
                observer.onNext(res);
            }, function (err) {
                observer.onError(err);
            })
            .finally(function () {
                observer.onCompleted();
            });
    })
    .shareReplay();
Run Code Online (Sandbox Code Playgroud)

现在,任何时候我订阅一个新的"听众"来对象观察者将被拉.任何新订阅者都将获得缓存的值,而无需再次触及服务器.

所以在我的"消费者"(Angular Directive)中,我想做这样的事情:

// ... countryInput directive code:

COUNTRIES_LOADED.subscribe(function (response) {
    // Fill in countries into scope or ctrl
    scope.countries = response.countries;
});
Run Code Online (Sandbox Code Playgroud)

COUNTRIES_LOADED观察者的任何未来订阅者都不得触发$ http请求.同样,如果指令从未包含在页面中,则永远不会调用$ http.

上面的解决方案有效,但我不知道这种方法的潜在缺点和内存含义.这是有效的解决方案吗?有没有更好/更合适的方法来实现这个使用RxJs?

非常感谢!

cmd*_*cmd 7

使用 Rx.Observable.fromPromise(promise)

fromPromise:

转换符合Promise/A +规范的Promise和/或符合ES2015的Promise或将所述Promise返回到Observable序列的工厂函数.

例:

var source = Rx.Observable.fromPromise(promise);

var subscription = source.subscribe(
  function (x) {
    console.log('Next: %s', x);
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });
Run Code Online (Sandbox Code Playgroud)

更新

rxjs6方法是 from


Cle*_*ent 5

更新

rxjs6 开始,您可以使用from()


您是否尝试过使用rxjs5fromPromise()API ?

在这里检查它的文档!


Max*_*101 4

我在这里找到了答案(只是名称略有不同) rxjs 在订阅时仅使用一次 Promise

因此,对于我的示例,答案很简单:

var loadCountries = function () { return $http.get('/countries'); };

var observable = Rx.Observable.defer(loadCountries).shareReplay();
Run Code Online (Sandbox Code Playgroud)