我正在为我的应用程序开发网络.所以我决定尝试Square的Retrofit.我看到他们支持简单Callback
@GET("/user/{id}/photo")
void getUserPhoto(@Path("id") int id, Callback<Photo> cb);
Run Code Online (Sandbox Code Playgroud)
和RxJava的 Observable
@GET("/user/{id}/photo")
Observable<Photo> getUserPhoto(@Path("id") int id);
Run Code Online (Sandbox Code Playgroud)
乍一看两者看起来都非常相似,但是当它实现时它变得有趣......
虽然简单的回调实现看起来类似于:
api.getUserPhoto(photoId, new Callback<Photo>() {
@Override
public void onSuccess() {
}
});
Run Code Online (Sandbox Code Playgroud)
这非常简单明了.随着Observable它迅速变得冗长和复杂.
public Observable<Photo> getUserPhoto(final int photoId) {
return Observable.create(new Observable.OnSubscribeFunc<Photo>() {
@Override
public Subscription onSubscribe(Observer<? super Photo> observer) {
try {
observer.onNext(api.getUserPhoto(photoId));
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
return Subscriptions.empty();
}
}).subscribeOn(Schedulers.threadPoolForIO());
}
Run Code Online (Sandbox Code Playgroud)
那不是它.你仍然需要做这样的事情:
Observable.from(photoIdArray)
.mapMany(new Func1<String, Observable<Photo>>() {
@Override
public Observable<Photo> call(Integer s) { …Run Code Online (Sandbox Code Playgroud) 在RxJava中有5种不同的调度程序可供选择:
immediate():创建并返回一个在当前线程上立即执行工作的Scheduler.
trampoline():创建并返回一个调度程序,该调度程序对当前工作完成后要执行的当前线程进行排队.
newThread():创建并返回一个Scheduler,为每个工作单元创建一个新的Thread.
computation():创建并返回用于计算工作的Scheduler.这可以用于事件循环,处理回调和其他计算工作.不要在此调度程序上执行IO绑定的工作.使用调度程序.io()代替.
io():创建并返回一个用于IO绑定工作的Scheduler.该实现由Executor线程池支持,该线程池将根据需要增长.这可用于异步执行阻塞IO.不要在此调度程序上执行计算工作.使用调度程序.计算()而不是.
前3个调度程序非常自我解释; 但是,我对计算和io有点困惑.
java.io)和files(java.nio.files)吗?它用于数据库查询吗?它是用于下载文件还是访问REST API?我想知道的区别
CompletableFuture,Future和Observable RxJava.
我所知道的都是异步但是
Future.get() 阻止线程
CompletableFuture 给出了回调方法
RxJava Observable--- CompletableFuture与其他好处相似(不确定)
例如:如果客户端需要进行多次服务调用,那么当我们使用Futures(Java)时Future.get()将按顺序执行...想知道它在RxJava中的表现如何...
文档http://reactivex.io/intro.html说
很难使用Futures来优化组合条件异步执行流程(或者不可能,因为每个请求的延迟在运行时会有所不同).当然,这可以完成,但它很快变得复杂(因此容易出错)或者过早地阻塞Future.get(),这消除了异步执行的好处.
真的很想知道如何RxJava解决这个问题.我发现从文档中很难理解.
你什么时候在RxJava中使用map vs flatMap?
比方说,我们想将包含JSON的文件映射到包含JSON的字符串中 -
使用map,我们必须以某种方式处理Exception.但是如何?:
Observable.from(jsonFile).map(new Func1<File, String>() {
@Override public String call(File file) {
try {
return new Gson().toJson(new FileReader(file), Object.class);
} catch (FileNotFoundException e) {
// So Exception. What to do ?
}
return null; // Not good :(
}
});
Run Code Online (Sandbox Code Playgroud)
使用flatMap,它更加冗长,但我们可以将问题转发到Observables链中,如果我们选择其他地方甚至重试,则可以处理错误:
Observable.from(jsonFile).flatMap(new Func1<File, Observable<String>>() {
@Override public Observable<String> call(final File file) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(Subscriber<? super String> subscriber) {
try {
String json = new Gson().toJson(new FileReader(file), Object.class);
subscriber.onNext(json);
subscriber.onCompleted();
} …Run Code Online (Sandbox Code Playgroud) 该rxjava DOC switchmap的定义是相当含糊,并将其链接到同一页作为flatmap.这两个运营商有什么区别?
Java 8流是否类似于RxJava observables?
Java 8流定义:
新
java.util.stream包中的类提供Stream API以支持对元素流的功能样式操作.
我一直在寻找新的rx java 2,我不太确定我理解了这个想法backpressure......
我知道我们有Observable没有backpressure支持,Flowable而且有它.
因此,基于例如,可以说我有flowable有interval:
Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
Run Code Online (Sandbox Code Playgroud)
这将在大约128个值之后崩溃,这很明显我消耗的速度比获取项目慢.
但后来我们也一样 Observable
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
Run Code Online (Sandbox Code Playgroud)
这根本不会崩溃,即使我延迟消耗它仍然有效.为了让Flowable工作让我说我放onBackpressureDrop操作员,崩溃已经消失,但并非所有值都被发出.
所以我目前无法找到答案的基本问题是我为什么要关心backpressure何时可以使用普通Observable仍然可以在不管理的情况下获得所有值buffer?或者从另一方面来看,有什么优势backpressure可以帮助我管理和处理消费?
我正在尝试升级到Retrofit 2.0并在我的android项目中添加RxJava.我正在进行api调用,并希望在服务器发出错误响应的情况下检索错误代码.
Observable<MyResponseObject> apiCall(@Body body);
Run Code Online (Sandbox Code Playgroud)
并在RxJava调用中:
myRetrofitObject.apiCall(body).subscribe(new Subscriber<MyResponseObject>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(MyResponseObject myResponseObject) {
//On response from server
}
});
Run Code Online (Sandbox Code Playgroud)
在Retrofit 1.9中,RetrofitError仍然存在,我们可以通过以下方式获得状态:
error.getResponse().getStatus()
Run Code Online (Sandbox Code Playgroud)
如何使用RxJava进行Retrofit 2.0?
任何人都可以用明确的例子解释RxJava中Observable,Completable和Single之间的区别吗?
在哪种情况下我们使用其中一个?
我在我的Android应用程序中使用rxjava来异步处理网络请求.现在我想在一段时间过后才重试失败的网络请求.
有没有办法在Observable上使用retry()但只能在一定延迟后重试?
有没有办法让Observable知道当前正在重试(而不是第一次尝试)?
我看了一下debounce()/ throttleWithTimeout(),但他们似乎做了不同的事情.
编辑:
我想我找到了一种方法,但是我会对确认这是正确的方法或其他更好的方法感兴趣.
我正在做的是:在我的Observable.OnSubscribe的call()方法中,在我调用Subscribers onError()方法之前,我只是让Thread睡眠所需的时间.所以,要每1000毫秒重试一次,我会这样做:
@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
try {
Log.d(TAG, "trying to load all products with pid: " + pid);
subscriber.onNext(productClient.getProductNodesForParentId(pid));
subscriber.onCompleted();
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e.printStackTrace();
}
subscriber.onError(e);
}
}
Run Code Online (Sandbox Code Playgroud)
由于此方法无论如何都在IO线程上运行,因此它不会阻止UI.我能看到的唯一问题是,即使是第一个错误也会报告延迟,所以即使没有重试(),也会出现延迟.如果延迟没有在错误之后应用,而是在重试之前(但不是在第一次尝试之前,显然),我会更喜欢它.
rx-java ×10
java ×4
android ×3
java-8 ×2
retrofit ×2
rx-android ×2
asynchronous ×1
flatmap ×1
java-stream ×1
mapping ×1
observable ×1
rx-java2 ×1