标签: rx-java

如何停止和恢复Observable.interval发出滴答声

这将每5秒发出一次滴答声.

Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
            .subscribe(tick -> Log.d(TAG, "tick = "+tick));
Run Code Online (Sandbox Code Playgroud)

要停止它你可以使用

Schedulers.shutdown();
Run Code Online (Sandbox Code Playgroud)

但随后所有调度程序停止,以后无法恢复计时.我怎样才能停止并恢复"优雅地"发出的声音?

java rx-java rx-android

33
推荐指数
3
解决办法
4万
查看次数

Scala流媒体库差异(Reactive Streams/Iteratee/RxScala/Scalaz ...)

我正在关注Coursera的Scala课程中的功能反应编程,我们处理RxScala Observables(基于RxJava).

据我所知,Play Iteratee的库看起来有点像RxScala Observables,其中Observables有点像Enumerators和Observers有点像Iteratees.

还有Scalaz Stream库,也许还有其他一些?


所以我想知道所有这些库之间的主要区别.在哪种情况下,一个可能比另一个更好?


PS:我想知道为什么Play Iteratees库没有被Martin Odersky选择用于他的课程,因为Play在Typesafe堆栈中.这是否意味着Martin喜欢RxScala而不是Play Iteratees?


编辑:无流举措刚刚宣布,作为一种尝试standardize a common ground for achieving statically typed, high-performance, low latency, asynchronous streams of data with built-in non-blocking back pressure

scala iterate playframework-2.0 rx-java scalaz-stream

32
推荐指数
2
解决办法
2680
查看次数

Java Concurrency,Akka和RxJava之间的区别?

今天我发现,对于Java中的并发性我们有很好的框架Akka,我也发现,有一个反应式编程框架就像在应用程序中RxJava执行一样multithreading.但我还是很困惑!为什么两者都优于Java Concurrency框架?

如今响应式编程是成熟的话题,大多数语言都支持Functional Reactive ProgramingNetflix有关的API提供Reactive programming了一种以上的语言.Rxjava是用于API之一java,scala等等.根据RxJava他们内部使用演员为维护multithreadingAkka也使用Actorsmultithreading编程.

那么,它们之间的区别AkkaReactive Programming方法之间的区别是Java Concurrency什么?

java concurrency akka rx-java

32
推荐指数
1
解决办法
2万
查看次数

RxJava - Just vs From

在以下情况下使用Observable.justvs 时我得到相同的输出Observable.from:

 public void myfunc() {
 //swap out just for from here and i get the same results,why ?
        Observable.just(1,2,3).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d("","all done. oncompleted called");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.d("","here is my integer:"+integer.intValue());
            }
        });

    }
Run Code Online (Sandbox Code Playgroud)

我想只是just假设发出一个项目,from并在某种列表中发出项目.有什么不同 ?我也注意到这一点just并且from仅涉及有限数量的论点.所以Observable.just(1,2,3,4,5,6,7,8,-1,-2)没关系,但Observable.just(1,2,3,4,5,6,7,8,-1,-2,-3)失败了.同样,我必须将它包装在一个列表或数组中.我只是好奇他们为什么不能定义无限的论点.

更新:我试验过,看到它just不采用数组结构,它just需要参数.from收集一个集合.以下是适用于from但不适用于just …

java rx-java

32
推荐指数
2
解决办法
1万
查看次数

不使用Observable.create创建Observable

我在我的Android应用程序中使用RxJava,我想从数据库加载数据.

通过这种方式,我创建了一个新的Observable,使用Observable.create()它返回一个列表EventLog

public Observable<List<EventLog>> loadEventLogs() {
    return Observable.create(new Observable.OnSubscribe<List<EventLog>>() {
        @Override
        public void call(Subscriber<? super List<EventLog>> subscriber) {
            List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
            List<EventLog> eventLogs = new ArrayList<>(logs.size());
            for (int i = 0; i < logs.size(); i++) {
                eventLogs.add(new EventLog(logs.get(i)));
            }
            subscriber.onNext(eventLogs);
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

虽然它可以正常工作,但我读到使用Observable.create()它实际上并不是Rx Java的最佳实践(参见此处).

所以我用这种方式改变了这个方法.

public Observable<List<EventLog>> loadEventLogs() {
    return Observable.fromCallable(new Func0<List<EventLog>>() {
        @Override
        public List<EventLog> call() {
            List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
            List<EventLog> eventLogs = new ArrayList<>(logs.size());
            for (int …
Run Code Online (Sandbox Code Playgroud)

android reactive-programming rx-java

32
推荐指数
1
解决办法
6849
查看次数

何时取消订阅订阅

我有一个关于如何取消订阅可观察量的问题.我有两个代码,我不确定哪个更好.

示例1 - >流完成后取消订阅订阅者:

Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onCompleted() {
            progressdialog.dissmiss();
            unsubscribe();
        }

        @Override
        public void onError(Throwable e) {
            progressdialog.dissmiss();
        }

        @Override
        public void onNext(String s) {
            // do something with data
        }
    }
Run Code Online (Sandbox Code Playgroud)

示例2 - >销毁活动后取消订阅订阅:

private void test(){
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onCompleted() {
            progressdialog.dissmiss();
        }

        @Override
        public void onError(Throwable e) {
            progressdialog.dissmiss();
        }

        @Override
        public void onNext(String s) {
            // do something with data
        }
    };

    subscription …
Run Code Online (Sandbox Code Playgroud)

android reactive-programming rx-java rx-android

31
推荐指数
3
解决办法
3万
查看次数

当使用Retrofit 2.0 Observables时出错时,如何获取Response body

我正在使用Retrofit 2.0来进行返回Observables的api调用.当呼叫完成并且响应符合预期时,一切正常.现在让我们说我们有一个错误响应,它会抛出一个onError.我想阅读响应正文,即使它是一个错误.

@FormUrlEncoded
@POST("tokenLogin")
Observable<LoginResponse> loginWithToken(
        @Field("token") String pin
);
Run Code Online (Sandbox Code Playgroud)

当请求和响应有效时,我得到正确的observable,并在出现错误时按预期调用onError.

正确回应:

{ "status" : "authenticated" }
Run Code Online (Sandbox Code Playgroud)

Observable将其转换为正确的Observable,我可以将响应读作LoginResponse对象.

现在,错误响应如下:

{ "errorMessage" : "You need to take some xyz action" }
Run Code Online (Sandbox Code Playgroud)

我想阅读该错误响应并将消息显示给用户.我该怎么做呢?

android rx-java retrofit rx-android

30
推荐指数
3
解决办法
2万
查看次数

如何在Activity中使用Retrofit和RxJava/RxAndroid处理旋转?

在这里读到我们可以使用一些全局缓存来处理旋转.

您可以通过使用缓存或重放Observable运算符来防止这种情况,同时确保Observable以某种方式在Activity生命周期中存活(例如,通过将其存储在全局缓存中,在片段中等)

但是如何实现呢?有人可以指点我怎么做的一些例子?或者您可能知道一些更好的方法来处理配置更改?

编辑:

目前我已经找到了很多方法,但最终我使用了RxBus实现.你可以在这里找到使用RxBus的好例子.

android rotation rx-java retrofit rx-android

29
推荐指数
1
解决办法
1万
查看次数

first()和take(1)之间的区别

我想了解细节RxJava.

直觉上我期望first()并且take(1)平等并且做同样的事情.但是通过挖掘源代码first()定义为take(1).single().

这有什么single()用?还不take(1)保证输出单个项目吗?

java reactive-programming rx-java

29
推荐指数
2
解决办法
8664
查看次数

RxJava引入了Single <T>.如何将Observable <T>转换为Single <T>?

RxJava最近推出了Single.有没有办法将已经存在的Observable(几乎是单个)转换为Single而不修改原始observable的源?

例如,我有一个api服务类,其中包含一个返回Observable的方法 - 它本质上是从远程资源中获取用户.说我无法修改服务.我想在其他地方消费,但返回单身.我该怎么做呢?

捏更多的背景

RxJava最近引入了Single的概念,它或多或少是一个Rx友好的简单回调(即一个Observable发出一个对象或一个错误)(在这里阅读更多关于它 - http://reactivex.io/documentation/single.html)

observable rx-java

29
推荐指数
3
解决办法
2万
查看次数