标签: reactive-extensions-js

rxjs flatmap缺失

我尝试链接多个rx.js可观察量并传递数据.Flatmap应该是拟合运算符但是导入

import { Observable } from 'rxjs/Observable';
Run Code Online (Sandbox Code Playgroud)

找不到:

Error TS2339: Property 'flatmap' does not exist on type 'Observable<Coordinates>'
Run Code Online (Sandbox Code Playgroud)

5.0.0-beta.6使用rx.js的版本.

public getCurrentLocationAddress():Observable<String> {
    return Observable.fromPromise(Geolocation.getCurrentPosition())
      .map(location => location.coords)
      .flatmap(coordinates => {
        console.log(coordinates);
        return this.http.request(this.geoCodingServer + "/json?latlng=" + coordinates.latitude + "," + coordinates.longitude)
          .map((res: Response) => {
                       let data = res.json();
                       return data.results[0].formatted_address;
              });
      });
  }
Run Code Online (Sandbox Code Playgroud)

reactive-extensions-js observable rxjs

57
推荐指数
5
解决办法
4万
查看次数

Angular2 RxJS得到'Observable_1.Observable.fromEvent不是函数'错误

我正在使用AngularJS 2 Beta 0,我正在尝试从窗口对象上的事件创建一个RxJS Observable.我相信我知道在我的服务中将事件捕获为Observable的公式:

var observ = Observable.fromEvent(this.windowHandle, 'hashchange');
Run Code Online (Sandbox Code Playgroud)

问题是,每次我尝试运行此代码时,都会收到一条错误消息,指出'fromEvent'不是函数.

Uncaught EXCEPTION: Error during evaluation of "click"
ORIGINAL EXCEPTION: TypeError: Observable_1.Observable.fromEvent is not a function
Run Code Online (Sandbox Code Playgroud)

这似乎意味着我现在没有import正确处理我,因为RxJS没有包含在Angular 2的构建中,尽管我的应用程序的其余部分正常运行,这对我来说意味着RxJS应该是它应该的位置.

我在服务中的导入如下所示:

import {Observable} from 'rxjs/Observable';
Run Code Online (Sandbox Code Playgroud)

虽然我也尝试使用它(对代码进行适当的更改),但结果相同:

import {FromEventObservable} from 'rxjs/observable/fromEvent';
Run Code Online (Sandbox Code Playgroud)

我的Index.html中有以下配置:

<script>
    System.config({
        map: {
            rxjs: 'node_modules/rxjs'
        },
        packages: {
            'app': {defaultExtension: 'js'},
            'rxjs': {defaultExtension: 'js'}
        }
    });
    System.import('app/app');
</script>
Run Code Online (Sandbox Code Playgroud)

有人能告诉我我做错了什么吗?

javascript reactive-extensions-js rxjs typescript angular

50
推荐指数
2
解决办法
4万
查看次数

SignalR与Reactive Extensions

SignalR和Reactive Extensions一样吗?你能解释为什么或为什么不解释?

system.reactive reactive-extensions-js signalr

42
推荐指数
1
解决办法
6194
查看次数

使用来自另一个observable的值过滤可观察对象

我有两个可观察量:

  1. 表示复选框输入列表的observable.
  2. 表示来自服务器的事件流的可观察对象.

我想使用第一个值中的值来过滤第二个observable.

从服务器接收的值包括tag属性,该属性对应于复选框列表中的值.上述两者的组合产生的可观察性只会产生服务器的值,该服务器的tag属性包含在勾选复选框的集合中.

reactive-extensions-js rxjs

30
推荐指数
2
解决办法
1万
查看次数

发布可观察量的最后一个值

我有一个热门的观察者(在这种情况下是一个主题):

var subject = new Rx.Subject();
Run Code Online (Sandbox Code Playgroud)

我想创建另一个observable,每次创建新订阅时,都会立即触发生成的最后一个值.

所以在伪代码中:

var myObservableWithLastValue = subject.publishLast();

subject.onNext(3);

myObservableWithLastValue.subscribe(function(x){
    console.log(x); //should write 3
});

myObservableWithLastValue.subscribe(function(x){
    console.log(x); //should write 3, too
});

subject.onNext(4);

myObservableWithLastValue.subscribe(function(x){
    console.log(x); //should write 4
});
Run Code Online (Sandbox Code Playgroud)

这大致是我想要的,似乎有效.但是,我想必须有一些内置的机制来实现相同的目标

Rx.Observable.prototype.keepLatest = function () {
    var latestValue;

    var disposable = this.subscribe(function (value) {
        latestValue = value;
    });

    return Rx.Observable.create(function (observer) {
        observer.onNext(latestValue);
        return disposable.dispose;
    });
};
Run Code Online (Sandbox Code Playgroud)

reactive-extensions-js rxjs

22
推荐指数
2
解决办法
2万
查看次数

在Ajax错误之后RxJS继续侦听

当内部可观察错误(Ajax请求)时,RxJs停止侦听单击事件.我试图弄清楚如何将事件监听器挂钩到按钮单击事件并优雅地处理内部ajax错误.

这是我的示例代码和plunkr的链接

var input = $("#testBtn");
var test = Rx.Observable.fromEvent(input,'click');

var testAjax = function() {
  return Rx.Observable.range(0,3).then(function(x){ 
    if(x==2)throw "RAWR"; //Simulating a promise error.
    return x;
  });
}

test.map(function(){
  return Rx.Observable.when(testAjax());
})
.switchLatest()
.subscribe(
  function (x) {
      console.log('Next: ', x);
  },
  function (err) {
      console.log('Error: ' + err);   
  },
  function () {
      console.log('Completed');   
  });
Run Code Online (Sandbox Code Playgroud)

http://plnkr.co/edit/NGMB7RkBbpN1ji4mfzih

javascript ajax error-handling reactive-extensions-js rxjs

20
推荐指数
2
解决办法
9424
查看次数

GHCJS:如何使用FFI导入高阶javascript函数?

如何在GHCJS中导入如下Javascript函数?

xs.subscribe(function(x) { console.log(x) })
Run Code Online (Sandbox Code Playgroud)

我尝试了以下各种组合而没有成功:

data Observable_
data Disposable_

type Observable a = JSRef Observable_
type Disposable = JSRef ()

foreign import javascript unsafe "$1.subscribe($2)"
  rx_subscribe :: Observable a -> JSRef (a -> IO()) -> IO Disposable
Run Code Online (Sandbox Code Playgroud)

感谢任何帮助,并链接到GHCJS FFI的文档.

谢谢

javascript haskell reactive-extensions-js rxjs ghcjs

18
推荐指数
1
解决办法
1216
查看次数

RxJs:轮询直到完成间隔或接收到正确的数据

如何在浏览器中使用RxJs执行以下方案:

  • 将数据提交到队列进行处理
  • 找回工作ID
  • 每1秒轮询另一个端点,直到结果可用或已经过60秒(然后失败)

我提出的中间解决方案:

 Rx.Observable
    .fromPromise(submitJobToQueue(jobData))
    .flatMap(jobQueueData => 
      Rx.Observable
            .interval(1000)
            .delay(5000)
            .map(_ => jobQueueData.jobId)
            .take(55)
    )
    .flatMap(jobId => Rx.Observable.fromPromise(pollQueueForResult(jobId)))
    .filter(result => result.completed)
    .subscribe(
      result => console.log('Result', result),
      error =>  console.log('Error', error)
    );
Run Code Online (Sandbox Code Playgroud)
  1. 有没有中间变量的方法在数据到达或发生错误时停止计时器?我现在可以介绍新的observable然后使用takeUntil
  2. flatMap使用此语义正确的?也许整个事情应该被改写而不是被束缚flatMap

javascript reactive-extensions-js rxjs

18
推荐指数
2
解决办法
8521
查看次数

我可以用rx.js观察数组的加法吗?

来自array Rx wiki on github

coffee> rext = require 'rx'                                                 
coffee> arr = [1..5]                                                 
[ 1, 2, 3, 4, 5 ]                                                    
coffee> obs = rext.Observable.fromArray(arr)                         
{ _subscribe: [Function] }                                           
coffee> obs.subscribe( (x) -> console.log("added value: " + x))      
added value: 1                                                       
added value: 2                                                       
added value: 3                                                       
added value: 4                                                       
added value: 5                                                       
{ isStopped: true,                                                   
  observer:                                                          
   { isStopped: true,                                                
     _onNext: [Function],                                            
     _onError: [Function: defaultError],                             
     _onCompleted: [Function: noop] },                               
  m: { isDisposed: true, current: null } }                           
coffee> arr.push(12)    # …
Run Code Online (Sandbox Code Playgroud)

javascript reactive-extensions-js coffeescript observable rxjs

17
推荐指数
2
解决办法
1万
查看次数

如何构造rxjs代码

如何构建rxjs应用程序?大约有一百个玩具介绍示例,但不是一个完整应用程序的单个示例,其中包含小部件,子小部件等,显示整个应用程序中的数据流.

例如,假设你有一个具有某种状态的观察者.您需要将其传递给窗口小部件.该小部件具有需要该状态部分的子小部件.你做订阅吗?

sub = state.subscribe(widget)
Run Code Online (Sandbox Code Playgroud)

现在'小部件'在monad之外.子窗口小部件不能在状态上使用可观察的方法.如果将窗口小部件作为副作用运行,则会出现同样的问题.

state.doAction(widget)
Run Code Online (Sandbox Code Playgroud)

那么你将流传递给小部件吗?如果是这样,你会得到什么回报?

what = widget(state)
Run Code Online (Sandbox Code Playgroud)

小部件是否订阅状态并返回一次性用户?它是否返回从状态派生的流?如果是这样,它里面有什么?你尝试从所有的部件/ subwidgets /子subwidgets具有广泛用途的SelectMany(身份)来获得你为了揭开整个事情关闭订阅最终应用程序流收集所有的流在一起吗?

如果窗口小部件根据状态按需创建子窗口小部件,窗口小部件如何管理其子窗口小部件?我一直在尝试使用groupBy()的解决方案,每个子窗口小部件都有一个组,但是从嵌套的observable管理所有订阅或流是一个令人难以置信的噩梦.

即使是整个应用程序的一个示例也会有所帮助.

javascript reactive-extensions-js rxjs

17
推荐指数
1
解决办法
2526
查看次数