这将每5秒发出一次滴答声.
Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
.subscribe(tick -> Log.d(TAG, "tick = "+tick));
Run Code Online (Sandbox Code Playgroud)
要停止它你可以使用
Schedulers.shutdown();
Run Code Online (Sandbox Code Playgroud)
但随后所有调度程序停止,以后无法恢复计时.我怎样才能停止并恢复"优雅地"发出的声音?
我正在关注Coursera的Scala课程中的功能反应编程,我们处理RxScala Observables(基于RxJava).
据我所知,Play Iteratee的库看起来有点像RxScala Observables,其中Observables有点像Enumerators和Observers有点像Iteratees.
还有Scalaz Stream库,也许还有其他一些?
所以我想知道所有这些库之间的主要区别.在哪种情况下,一个可能比另一个更好?
PS:我想知道为什么Play Iteratees库没有被Martin Odersky选择用于他的课程,因为Play在Typesafe堆栈中.这是否意味着Martin喜欢RxScala而不是Play Iteratees?
编辑:在无流举措刚刚宣布,作为一种尝试standardize a common ground for achieving statically typed, high-performance, low latency, asynchronous streams of data with built-in non-blocking back pressure
今天我发现,对于Java中的并发性我们有很好的框架Akka,我也发现,有一个反应式编程框架就像在应用程序中RxJava执行一样multithreading.但我还是很困惑!为什么两者都优于Java Concurrency框架?
如今响应式编程是成熟的话题,大多数语言都支持Functional Reactive Programing像Netflix有关的API提供Reactive programming了一种以上的语言.Rxjava是用于API之一java,scala等等.根据RxJava他们内部使用演员为维护multithreading和Akka也使用Actors了multithreading编程.
那么,它们之间的区别Akka和Reactive Programming方法之间的区别是Java Concurrency什么?
在以下情况下使用Observable.justvs 时我得到相同的输出Observable.from:
public void myfunc() {
//swap out just for from here and i get the same results,why ?
Observable.just(1,2,3).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d("","all done. oncompleted called");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.d("","here is my integer:"+integer.intValue());
}
});
}
Run Code Online (Sandbox Code Playgroud)
我想只是just假设发出一个项目,from并在某种列表中发出项目.有什么不同 ?我也注意到这一点just并且from仅涉及有限数量的论点.所以Observable.just(1,2,3,4,5,6,7,8,-1,-2)没关系,但Observable.just(1,2,3,4,5,6,7,8,-1,-2,-3)失败了.同样,我必须将它包装在一个列表或数组中.我只是好奇他们为什么不能定义无限的论点.
更新:我试验过,看到它just不采用数组结构,它just需要参数.from收集一个集合.以下是适用于from但不适用于just …
我在我的Android应用程序中使用RxJava,我想从数据库加载数据.
通过这种方式,我创建了一个新的Observable,使用Observable.create()它返回一个列表EventLog
public Observable<List<EventLog>> loadEventLogs() {
return Observable.create(new Observable.OnSubscribe<List<EventLog>>() {
@Override
public void call(Subscriber<? super List<EventLog>> subscriber) {
List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
List<EventLog> eventLogs = new ArrayList<>(logs.size());
for (int i = 0; i < logs.size(); i++) {
eventLogs.add(new EventLog(logs.get(i)));
}
subscriber.onNext(eventLogs);
}
});
}
Run Code Online (Sandbox Code Playgroud)
虽然它可以正常工作,但我读到使用Observable.create()它实际上并不是Rx Java的最佳实践(参见此处).
所以我用这种方式改变了这个方法.
public Observable<List<EventLog>> loadEventLogs() {
return Observable.fromCallable(new Func0<List<EventLog>>() {
@Override
public List<EventLog> call() {
List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
List<EventLog> eventLogs = new ArrayList<>(logs.size());
for (int …Run Code Online (Sandbox Code Playgroud) 我有一个关于如何取消订阅可观察量的问题.我有两个代码,我不确定哪个更好.
示例1 - >流完成后取消订阅订阅者:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
progressdialog.dissmiss();
unsubscribe();
}
@Override
public void onError(Throwable e) {
progressdialog.dissmiss();
}
@Override
public void onNext(String s) {
// do something with data
}
}
Run Code Online (Sandbox Code Playgroud)
示例2 - >销毁活动后取消订阅订阅:
private void test(){
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
progressdialog.dissmiss();
}
@Override
public void onError(Throwable e) {
progressdialog.dissmiss();
}
@Override
public void onNext(String s) {
// do something with data
}
};
subscription …Run Code Online (Sandbox Code Playgroud) 我正在使用Retrofit 2.0来进行返回Observables的api调用.当呼叫完成并且响应符合预期时,一切正常.现在让我们说我们有一个错误响应,它会抛出一个onError.我想阅读响应正文,即使它是一个错误.
例
@FormUrlEncoded
@POST("tokenLogin")
Observable<LoginResponse> loginWithToken(
@Field("token") String pin
);
Run Code Online (Sandbox Code Playgroud)
当请求和响应有效时,我得到正确的observable,并在出现错误时按预期调用onError.
正确回应:
{ "status" : "authenticated" }
Run Code Online (Sandbox Code Playgroud)
Observable将其转换为正确的Observable,我可以将响应读作LoginResponse对象.
现在,错误响应如下:
{ "errorMessage" : "You need to take some xyz action" }
Run Code Online (Sandbox Code Playgroud)
我想阅读该错误响应并将消息显示给用户.我该怎么做呢?
我想了解细节RxJava.
直觉上我期望first()并且take(1)平等并且做同样的事情.但是通过挖掘源代码first()定义为take(1).single().
这有什么single()用?还不take(1)保证输出单个项目吗?
RxJava最近推出了Single.有没有办法将已经存在的Observable(几乎是单个)转换为Single而不修改原始observable的源?
例如,我有一个api服务类,其中包含一个返回Observable的方法 - 它本质上是从远程资源中获取用户.说我无法修改服务.我想在其他地方消费,但返回单身.我该怎么做呢?
捏更多的背景
RxJava最近引入了Single的概念,它或多或少是一个Rx友好的简单回调(即一个Observable发出一个对象或一个错误)(在这里阅读更多关于它 - http://reactivex.io/documentation/single.html)
rx-java ×10
android ×4
java ×4
rx-android ×4
retrofit ×2
akka ×1
concurrency ×1
iterate ×1
observable ×1
rotation ×1
scala ×1