标签: rx-java

如何使用RxJava在onNext()中抛出错误

.subscribe(
    new Action1<Response>() {
        @Override
        public void call(Response response) {
            if (response.isSuccess())
            //handle success
            else
            //throw an Throwable(reponse.getMessage())
        }
    },
    new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
            //handle Throwable throw from onNext();
        }
    }
);
Run Code Online (Sandbox Code Playgroud)

我不想处理(!response.isSuccess())onNext().我怎么能把它扔到onError()另一个扔掉的处理?

java rx-java

12
推荐指数
2
解决办法
1万
查看次数

如何在RX java链中使用"if-else"?

我是RXJava/RXAndroid的新手.我想实现这个案例:根据RXJava中的某些条件选择不同的方式.例如,首先,我从网络获取用户信息,如果这是VIP用户,我将继续从网络获取更多信息或只在主线程中显示一些信息(打破链.)这里的流程图:https: //i.stack.imgur.com/0hztR.png

我做了一些搜索,只发现"switchIfEmpty"可能有所帮助.我写下面的代码:

getUserFromNetwork("userId")
                .flatMap(new Function<User, ObservableSource<User>>() {
                    @Override
                    public ObservableSource<User> apply(User user) throws Exception {
                        if(!user.isVip){
                            //show user info on MainThread!
                            return Observable.empty();
                        }else{
                            return getVipUserFromNetwork("userId");
                        }
                    }
                }).switchIfEmpty(new ObservableSource<User>() {
                    @Override
                    public void subscribe(Observer<? super User> observer) {
                        //show user info in main thread
                        //just break the chain for normal user
                        observer.onComplete();
                    }
                }).doOnNext(new Consumer<User>() {
                    @Override
                    public void accept(User user) throws Exception {
                        //show vip user info in main thread
                    }
                }).subscribe();
Run Code Online (Sandbox Code Playgroud)

有更简单的方法来实现这一目标吗?

谢谢!

java android rx-java rx-android rx-java2

12
推荐指数
2
解决办法
1万
查看次数

使用SqlBrite/SqlDelight(脱机数据库)和Retrofit(Http请求)的存储库模式

我正在使用SqlBrite/SqlDelight在RxJava中实现存储库模式以进行脱机数据存储并为Http请求进行改造

这是一个样本:

protected Observable<List<Item>> getItemsFromDb() {
     return database.createQuery(tableName(), selectAllStatement())
             .mapToList(cursor -> selectAllMapper().map(cursor));
 }


public Observable<List<Item>>getItems(){
     Observable<List<Item>> server = getRequest()
                 .doOnNext(items -> {
                     BriteDatabase.Transaction transaction = database.newTransaction();
                     for (Item item : items){
                         database.insert(tableName(), contentValues(item));
                     }
                     transaction.markSuccessful();
                     transaction.end();
                 })
                 .flatMap(items -> getItemsFromDbById())
                 .delaySubscription(200, TimeUnit.MILLISECONDS);
         Observable<List<Item>> db = getItemsFromDbById(id)
                 .filter(items -> items != null && items.size() > 0);
     return Observable.amb(db, server).doOnSubscribe(() -> server.subscribe(items -> {}, throwable -> {}));
 }
Run Code Online (Sandbox Code Playgroud)

当前实现用于Observable.amb获取最新的2个流并返回db流,以防万一db有数据或服务器.为了防止在没有互联网的情况下早期失败,server有一个delaySubscription上面的200ms.

我尝试使用, …

android rx-java retrofit sqlbrite sqldelight

12
推荐指数
1
解决办法
543
查看次数

使用Reactivex的Consumer接口

我是ReactiveX的新手.我是从源代码中学习的.一切都很清楚但突然间我得到了一个名为"Consumer"的词,它是一个界面.它被用来代替观察者.有人能告诉我它到底是做什么的吗?

我遵循了几个链接,但他们只是说了一个语句Consumer是一个接受单个值的功能接口(回调). 我想知道它的确切工作.

  1. 它是什么?
  2. 我们为什么需要它?
  3. 如何使用它?
  4. 它取代了观察者吗?如果是,如何以及为什么?

android rx-java retrofit rx-android reactivex

12
推荐指数
2
解决办法
5704
查看次数

Spring Webflux Async PostgreSQL Publisher在第一个结果后停止

我正在尝试用反应异步postgres-async-driver替换PostgreSQL数据库轮询并将新插入的行流式传输到Spring 5 Webflux Reactive websocket客户端,例如Josh Long 在这里演示并基于SébastienDeleuze的 spring-reactive-playground的很棒的示例.

Publisher获得第一个row,但后来没有返回后续行.是问题,我Observable,我Publisher,或者我如何使用Postgres的,异步驱动程序Db

public Observable<WebSocketMessage> getObservableWSM(WebSocketSession session){
    return
        // com.github.pgasync.Db
        db.queryRows(sql)
        // ~RowMapper method
        .map(row -> mapRowToDto(row))
        // serialize dto to String for websocket
        .map(dto -> { return objectMapper.writeValueAsString(dto); })
        // finally, write to websocket session 
        .map(str -> { return session.textMessage((String) str);
        });
}
Run Code Online (Sandbox Code Playgroud)

然后,ObservableWebSocketHandler使用RxReactiveStream.toPublisher转换器连接到我:

@Bean
WebSocketHandler dbWebSocketHandler() {
    return …
Run Code Online (Sandbox Code Playgroud)

postgresql spring websocket rx-java spring-webflux

12
推荐指数
1
解决办法
2466
查看次数

RxJava作为事件总线?

我开始学习RxJava,到目前为止我喜欢它.我有一个片段与按钮单击时的活动进行通信(用新片段替换当前片段).谷歌建议片段的接口与活动进行通信,但是它太冗长了,我试图使用普遍适用的广播接收器,但它有缺点.

因为我正在学习RxJava,我想知道从片段到活动(或片段到片段)的沟通是否是一个很好的选择?如果是这样,那么使用RxJava进行此类通信的最佳方式是什么?我需要做事件总线像这样的一个,如果是那样的话,我应该让总线的一个实例,并在全球范围使用(与主题)?

android event-bus android-fragments rx-java

11
推荐指数
1
解决办法
2892
查看次数

Rxjava AndroidSchedulers.mainThread()意味着UI线程?

我的代码是这样的:

.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe ({
    adapter.notifyDataSetChanged()
})
Run Code Online (Sandbox Code Playgroud)

但我收到一个错误:只有创建视图层次结构的原始线程才能触及其视图. 所以我把它改成:

.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe ({
        runOnUiThread(Runnable {
            adapter.notifyDataSetChanged()
 })
 }
Run Code Online (Sandbox Code Playgroud)

这很有道理.所以我很困惑.我曾经想过.observeOn(AndroidSchedulers.mainThread())订阅块中的代码在ui线程上运行,但我是如何得到这个错误的?

android rx-java

11
推荐指数
1
解决办法
3440
查看次数

互联网检查,使用MVP,RX和Retrofit时的位置

我已经完成了这个这篇文章.所以我真的同意主持人不应该知道Android特定事情的第二篇文章.所以我在想的是将互联网检查放在服务层.我正在使用Rx Java进行网络调用,所以我可以在进行服务调用之前进行网络检查,这样我就需要手动抛出和IOException,因为我需要在网络不可用时在视图上显示错误页面,另一个选择是我为没有互联网创建自己的错误类

Observable<PaginationResponse<Notification>> response = Observable.create(new Observable.OnSubscribe<PaginationResponse<Notification>>() {
            @Override
            public void call(Subscriber<? super PaginationResponse<Notification>> subscriber) {
                if (isNetworkConnected()) {
                    Call<List<Notification>> call = mService.getNotifications();
                    try {
                        Response<List<Notification>> response = call.execute();
                        processPaginationResponse(subscriber, response);
                    } catch (IOException e) {
                        e.printStackTrace();
                        subscriber.onError(e);
                    }
                } else {
//This is I am adding manually
                    subscriber.onError(new IOException);
                }
                subscriber.onCompleted();
            }
        });
Run Code Online (Sandbox Code Playgroud)

我的另一种方法是将拦截器添加到OkHttpClient并将其设置为改装

OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
        builder.addInterceptor(new Interceptor() {
            @Override
            public Response intercept(Chain chain) throws IOException {
                if (!isNetworkConnected()) {
                    throw new …
Run Code Online (Sandbox Code Playgroud)

architecture mvp android rx-java retrofit2

11
推荐指数
1
解决办法
4866
查看次数

单元测试 - 已订阅验证Observable

我有这样的java代码

 mDataManager.getObservable("hello").subscribe( subscriber );
Run Code Online (Sandbox Code Playgroud)

我想要verify以下Observable.subscribe()

我试图模仿getObservable()verify

 Observable<Response> res = mock(Observable.class);
 when(mDataManager.getObservable("hello")).thenReturn(res);
 verify(res).subscribe();
Run Code Online (Sandbox Code Playgroud)

但是有一个错误

Caused by: java.lang.IllegalStateException: onSubscribe function can not be null.
at rx.Observable.subscribe(Observable.java:8167)
at rx.Observable.subscribe(Observable.java:8158)
at rx.Observable.subscribe(Observable.java:7962)
....
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: omni.neo.hk.omniapiservice.v4.model.external.UserLoginBean.class
at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:109)
at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:187)
at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:82)
... 48 more
Run Code Online (Sandbox Code Playgroud)

我认为mock这里不可能是一个Observable,但是如果没有一个模拟的Observable我就做不到verify(res).subscribe()

在这种情况下的任何建议?

java unit-testing mockito observable rx-java

11
推荐指数
1
解决办法
4183
查看次数

CursorLoader的onLoadFinished回调中的RxJava2

要从数据库中获取数据,我CursorLoader在应用程序中使用.一旦onLoadFinished()回调方法调用app的逻辑,就将Cursor对象转换为List业务模型需求中的对象.如果有大量数据,那么转换(繁重操作)需要一些时间.这会降低UI线程的速度.我尝试Thread使用RxJava2传递Cursor对象在非UI中启动转换,但得到Exception:

Caused by: android.database.StaleDataException: Attempting to access a closed CursorWindow.Most probable cause: cursor is deactivated prior to calling this method.
Run Code Online (Sandbox Code Playgroud)

这是Fragment代码的一部分:

@Override
    public Loader<Cursor> onCreateLoader(int id, Bundle args) {
        QueryBuilder builder;
        switch (id) {
            case Constants.FIELDS_QUERY_TOKEN:
                builder = QueryBuilderFacade.getFieldsQB(activity);
                return new QueryCursorLoader(activity, builder);
            default:
                return null;
        }
    }

    @Override
    public void onLoadFinished(Loader<Cursor> loader, Cursor cursor) {
        if (cursor.getCount() > 0) {
            getFieldsObservable(cursor) …
Run Code Online (Sandbox Code Playgroud)

sqlite android android-cursorloader rx-java rx-android

11
推荐指数
2
解决办法
650
查看次数