.subscribe(
new Action1<Response>() {
@Override
public void call(Response response) {
if (response.isSuccess())
//handle success
else
//throw an Throwable(reponse.getMessage())
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
//handle Throwable throw from onNext();
}
}
);
Run Code Online (Sandbox Code Playgroud)
我不想处理(!response.isSuccess())在onNext().我怎么能把它扔到onError()另一个扔掉的处理?
我是RXJava/RXAndroid的新手.我想实现这个案例:根据RXJava中的某些条件选择不同的方式.例如,首先,我从网络获取用户信息,如果这是VIP用户,我将继续从网络获取更多信息或只在主线程中显示一些信息(打破链.)这里的流程图:https: //i.stack.imgur.com/0hztR.png
我做了一些搜索,只发现"switchIfEmpty"可能有所帮助.我写下面的代码:
getUserFromNetwork("userId")
.flatMap(new Function<User, ObservableSource<User>>() {
@Override
public ObservableSource<User> apply(User user) throws Exception {
if(!user.isVip){
//show user info on MainThread!
return Observable.empty();
}else{
return getVipUserFromNetwork("userId");
}
}
}).switchIfEmpty(new ObservableSource<User>() {
@Override
public void subscribe(Observer<? super User> observer) {
//show user info in main thread
//just break the chain for normal user
observer.onComplete();
}
}).doOnNext(new Consumer<User>() {
@Override
public void accept(User user) throws Exception {
//show vip user info in main thread
}
}).subscribe();
Run Code Online (Sandbox Code Playgroud)
有更简单的方法来实现这一目标吗?
谢谢!
我正在使用SqlBrite/SqlDelight在RxJava中实现存储库模式以进行脱机数据存储并为Http请求进行改造
这是一个样本:
protected Observable<List<Item>> getItemsFromDb() {
return database.createQuery(tableName(), selectAllStatement())
.mapToList(cursor -> selectAllMapper().map(cursor));
}
public Observable<List<Item>>getItems(){
Observable<List<Item>> server = getRequest()
.doOnNext(items -> {
BriteDatabase.Transaction transaction = database.newTransaction();
for (Item item : items){
database.insert(tableName(), contentValues(item));
}
transaction.markSuccessful();
transaction.end();
})
.flatMap(items -> getItemsFromDbById())
.delaySubscription(200, TimeUnit.MILLISECONDS);
Observable<List<Item>> db = getItemsFromDbById(id)
.filter(items -> items != null && items.size() > 0);
return Observable.amb(db, server).doOnSubscribe(() -> server.subscribe(items -> {}, throwable -> {}));
}
Run Code Online (Sandbox Code Playgroud)
当前实现用于Observable.amb获取最新的2个流并返回db流,以防万一db有数据或服务器.为了防止在没有互联网的情况下早期失败,server有一个delaySubscription上面的200ms.
我尝试使用, …
我是ReactiveX的新手.我是从源代码中学习的.一切都很清楚但突然间我得到了一个名为"Consumer"的词,它是一个界面.它被用来代替观察者.有人能告诉我它到底是做什么的吗?
我遵循了几个链接,但他们只是说了一个语句Consumer是一个接受单个值的功能接口(回调). 我想知道它的确切工作.
我正在尝试用反应异步postgres-async-driver替换PostgreSQL数据库轮询器并将新插入的行流式传输到Spring 5 Webflux Reactive websocket客户端,例如Josh Long 在这里演示并基于SébastienDeleuze的 spring-reactive-playground的很棒的示例.
我Publisher获得第一个row,但后来没有返回后续行.是问题,我Observable,我Publisher,或者我如何使用Postgres的,异步驱动程序Db?
public Observable<WebSocketMessage> getObservableWSM(WebSocketSession session){
return
// com.github.pgasync.Db
db.queryRows(sql)
// ~RowMapper method
.map(row -> mapRowToDto(row))
// serialize dto to String for websocket
.map(dto -> { return objectMapper.writeValueAsString(dto); })
// finally, write to websocket session
.map(str -> { return session.textMessage((String) str);
});
}
Run Code Online (Sandbox Code Playgroud)
然后,Observable我WebSocketHandler使用RxReactiveStream.toPublisher转换器连接到我:
@Bean
WebSocketHandler dbWebSocketHandler() {
return …Run Code Online (Sandbox Code Playgroud) 我的代码是这样的:
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe ({
adapter.notifyDataSetChanged()
})
Run Code Online (Sandbox Code Playgroud)
但我收到一个错误:只有创建视图层次结构的原始线程才能触及其视图. 所以我把它改成:
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe ({
runOnUiThread(Runnable {
adapter.notifyDataSetChanged()
})
}
Run Code Online (Sandbox Code Playgroud)
这很有道理.所以我很困惑.我曾经想过.observeOn(AndroidSchedulers.mainThread())订阅块中的代码在ui线程上运行,但我是如何得到这个错误的?
我已经完成了这个和这篇文章.所以我真的同意主持人不应该知道Android特定事情的第二篇文章.所以我在想的是将互联网检查放在服务层.我正在使用Rx Java进行网络调用,所以我可以在进行服务调用之前进行网络检查,这样我就需要手动抛出和IOException,因为我需要在网络不可用时在视图上显示错误页面,另一个选择是我为没有互联网创建自己的错误类
Observable<PaginationResponse<Notification>> response = Observable.create(new Observable.OnSubscribe<PaginationResponse<Notification>>() {
@Override
public void call(Subscriber<? super PaginationResponse<Notification>> subscriber) {
if (isNetworkConnected()) {
Call<List<Notification>> call = mService.getNotifications();
try {
Response<List<Notification>> response = call.execute();
processPaginationResponse(subscriber, response);
} catch (IOException e) {
e.printStackTrace();
subscriber.onError(e);
}
} else {
//This is I am adding manually
subscriber.onError(new IOException);
}
subscriber.onCompleted();
}
});
Run Code Online (Sandbox Code Playgroud)
我的另一种方法是将拦截器添加到OkHttpClient并将其设置为改装
OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
builder.addInterceptor(new Interceptor() {
@Override
public Response intercept(Chain chain) throws IOException {
if (!isNetworkConnected()) {
throw new …Run Code Online (Sandbox Code Playgroud) 我有这样的java代码
mDataManager.getObservable("hello").subscribe( subscriber );
Run Code Online (Sandbox Code Playgroud)
我想要verify以下Observable.subscribe()
我试图模仿getObservable()和verify
Observable<Response> res = mock(Observable.class);
when(mDataManager.getObservable("hello")).thenReturn(res);
verify(res).subscribe();
Run Code Online (Sandbox Code Playgroud)
但是有一个错误
Caused by: java.lang.IllegalStateException: onSubscribe function can not be null.
at rx.Observable.subscribe(Observable.java:8167)
at rx.Observable.subscribe(Observable.java:8158)
at rx.Observable.subscribe(Observable.java:7962)
....
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: omni.neo.hk.omniapiservice.v4.model.external.UserLoginBean.class
at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:109)
at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:187)
at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:82)
... 48 more
Run Code Online (Sandbox Code Playgroud)
我认为mock这里不可能是一个Observable,但是如果没有一个模拟的Observable我就做不到verify(res).subscribe()
在这种情况下的任何建议?
要从数据库中获取数据,我CursorLoader在应用程序中使用.一旦onLoadFinished()回调方法调用app的逻辑,就将Cursor对象转换为List业务模型需求中的对象.如果有大量数据,那么转换(繁重操作)需要一些时间.这会降低UI线程的速度.我尝试Thread使用RxJava2传递Cursor对象在非UI中启动转换,但得到Exception:
Caused by: android.database.StaleDataException: Attempting to access a closed CursorWindow.Most probable cause: cursor is deactivated prior to calling this method.
Run Code Online (Sandbox Code Playgroud)
这是Fragment代码的一部分:
@Override
public Loader<Cursor> onCreateLoader(int id, Bundle args) {
QueryBuilder builder;
switch (id) {
case Constants.FIELDS_QUERY_TOKEN:
builder = QueryBuilderFacade.getFieldsQB(activity);
return new QueryCursorLoader(activity, builder);
default:
return null;
}
}
@Override
public void onLoadFinished(Loader<Cursor> loader, Cursor cursor) {
if (cursor.getCount() > 0) {
getFieldsObservable(cursor) …Run Code Online (Sandbox Code Playgroud) rx-java ×10
android ×7
java ×3
rx-android ×3
retrofit ×2
architecture ×1
event-bus ×1
mockito ×1
mvp ×1
observable ×1
postgresql ×1
reactivex ×1
retrofit2 ×1
rx-java2 ×1
spring ×1
sqlbrite ×1
sqldelight ×1
sqlite ×1
unit-testing ×1
websocket ×1