如何从函数创建Observable?

Tit*_*tan 15 javascript system.reactive rxjs rx-java rxjs5

我想调用一个函数(同步),然后使用它的返回值作为初始发射(随后将一些其他运算符链接到生成的observable上).

我想在订阅期间调用此函数,所以我不能只使用Observable.of(() => getSomeValue()).我已经看过bindCallback(之前的fromCallback),但我不认为它可以用于这项任务(如果我错了,请纠正我).我start在v4文档中看到了静态运算符,但显然它没有在v5中实现(并且没有表明它在路上).RxJava也有fromCallable运营商完成那个afaik.

我能想到的只有这样:

Observable.create((observer: Observer<void>) => {
  let val = getSomeValue();
  observer.next(val);
  observer.complete();
})
Run Code Online (Sandbox Code Playgroud)

我认为就是这样.但是这对于简单的事情来说似乎很复杂,应该是这样的Observable.fromFunction(() => getSomeValue()).如果我想异步运行它,就像start运算符一样?如何在RxJS的当前版本中执行此操作?

pau*_*els 19

我倾向于避免任何Observable.create可能的明确使用,因为通常它不仅需要管理事件发射而且还需要管理您的拆卸逻辑.

你可以Observable.defer改用.它接受一个返回一个Observable或一个Observable-like东西的函数(读取:Promise,Array,Iterators).因此,如果您有一个返回异步事物的函数,它就像:

Observable.defer(() => doSomethingAsync());
Run Code Online (Sandbox Code Playgroud)

如果您希望这可以使用同步结果,那么执行:

Observable.defer(() => Observable.of(doSomethingSync()));
Run Code Online (Sandbox Code Playgroud)

注意:这样create会重新运行每个订阅的功能.这是不同的,然后说它的结果Observable.bindCallback存储函数调用结果而不重新执行该函数.因此,如果您需要这种行为,则需要使用适当的multicasting运算符.


Mat*_*att 5

fromFunction$我在项目中使用的a 的实现:

function fromFunction$<T>(factory: () => T): Observable<T> {
    return Observable.create((observer: Subscriber<T>) => {
        try {
            observer.next(factory());
            observer.complete();
        } catch (error) {
            observer.error(error);
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

用法如下:

fromFunction$(() => 0).subscribe((value) => console.log(`Value is '${value}'`), null, () => console.log('Completed'));
fromFunction$(() => [1, 2, 3]).subscribe((value) => console.log(`Value is '${value}'`), null, () => console.log('Completed'));
fromFunction$(() => { throw 'Something' }).subscribe(null, (error) => console.error(`Error: ${error}`));
Run Code Online (Sandbox Code Playgroud)

给出:

Value is '0'
Completed

Value is '1,2,3'
Completed

Error: Something
Run Code Online (Sandbox Code Playgroud)

直到这样的实现存在。


mar*_*tin 3

实际上,我认为最好的选择是使用Observable.create,因为它是同步和异步初始值最通用的解决方案。

如果您确定要使用同步函数,则可以使用startWith()运算符(仅当getSomeValue()所有观察者的返回值都相同时这才有意义)。

使用Observable.bindCallbackObservable 作为源当然是可行的,但我个人建议避免使用它,因为它会使你的代码很难理解,而且通常没有必要,因为你可以只使用Observable.create.