标签: rx-java2

RxJava在空时发出错误

如果Observable在完成之前没有发出一个值,我想"抛出"自定义错误.

让我试着举个例子:

Observable<SomeClass> stream = ...

stream
.filter(...)
.singleOrError(new MyCustomException())
Run Code Online (Sandbox Code Playgroud)

所以我有一个SomeClass对象流.如果fitler()不发出一个元素,我想发出自定义错误.

error-handling exception rx-java rx-java2

4
推荐指数
1
解决办法
5402
查看次数

RxJava2: Alternative to rx.Observable method first(predicate)

I wanted to use RxJava but can't come up with alternative for method

public final Observable<T> first(Func1<? super T,java.lang.Boolean> predicate)
Run Code Online (Sandbox Code Playgroud)

in RxJava2.

What I want to do is following:

return io.reactivex.Observable
    .concat(source1, source2, source3, source4)
    .first(obj -> obj != null);
Run Code Online (Sandbox Code Playgroud)

Parameters source1 to source4 are io.reactivex.Observable instances that I concatenate and I want resulting Observable to emit only the very first item that isn't null but this of course fails because io.reactivex.Observable doesn't have method first(Func1 predicate) like rx.Observable. …

java rx-java reactivex rx-java2

4
推荐指数
1
解决办法
1239
查看次数

为什么Flowables不是Observables

为什么Flowables不能观察到; 可观察接口几乎是Flowable的一个子集,它们的实现几乎相同.

为什么它们不实现通用接口,所以我们可以直接将Flowable转换为Observable?

rx-java reactive rx-java2

4
推荐指数
2
解决办法
605
查看次数

Rx2 blockingFirst()不起作用

我在rx世界中介绍,我想了解为什么在订阅newThread时阻塞不起作用.例如:

这是有效的:

int i = Observable.fromArray(1,2,3,4).blockingFirst();
Run Code Online (Sandbox Code Playgroud)

这不起作用:

int i = Observable.just(1,2,3,4)
      .subscribeOn(Schedulers.newThread())
      .observeOn(AndroidSchedulers.mainThread()).blockingFirst();
Run Code Online (Sandbox Code Playgroud)

如果有可能使第二种情况起作用.

谢谢 ;)

rx-java rx-android reactivex rx-java2

4
推荐指数
1
解决办法
2292
查看次数

为什么在重放()之前调用publish()很重要

我读过很多关于publish()和replay()的rx例子.在所有教程中,他们都说在调用replay()之前调用publish()很重要.为什么会这样?如果你调用replay()并且从不调用publish()会发生什么?似乎replay()运算符会订阅源observable并开始缓存.然后任何人都会获得相同的完整数据流.我可以看到publish()改变游戏的唯一原因是它会延迟订阅源observable,直到调用connect().这是正确的吗?

reactive-programming rx-java rx-android rx-java2

4
推荐指数
1
解决办法
748
查看次数

从数据库+网络加载数据(Room + Retrofit + RxJava2)

我有一个示例API请求,它返回用户监视列表的列表.我想在用户加载监视列表屏幕时实现以下流程:

  1. 立即从DB缓存加载数据.(cacheWatchList)

  2. 在后台启动RetroFit网络呼叫.

    一世.onSuccess返回apiWatchList
    ii.onError返回cacheWatchList

  3. Diff cacheWatchListvs.apiWatchList

    一世.相同 - >一切都很好,因为数据已经显示给用户什么都不做.

    II.不同 - >保存apiWatchList到本地商店并发apiWatchList送到下游.

到目前为止我做了什么?

Watchlist.kt

data class Watchlist(
  val items: List<Repository> = emptyList()
)
Run Code Online (Sandbox Code Playgroud)

LocalStore.kt(Android室)

  fun saveUserWatchlist(repositories: List<Repository>): Completable {    
    return Completable.fromCallable {      
      watchlistDao.saveAllUserWatchlist(*repositories.toTypedArray())
    }
  }
Run Code Online (Sandbox Code Playgroud)

RemoteStore.kt(改造api调用)

  fun getWatchlist(userId: UUID): Single<Watchlist?> {
    return api.getWatchlist(userId)
  }
Run Code Online (Sandbox Code Playgroud)

DataManager.kt

  fun getWatchlist(userId: UUID): Flowable<List<Repository>?> {
    val localSource: Single<List<Repository>?> =
      localStore.getUserWatchlist()
        .subscribeOn(scheduler.computation)

    val remoteSource: Single<List<Repository>> …
Run Code Online (Sandbox Code Playgroud)

android retrofit2 rx-java2 android-room

4
推荐指数
1
解决办法
2103
查看次数

Kotlin和RxJava类型推断失败

我正在尝试在发生计时器的情况下在Observable上实现retryWhen,但是在IDE Android Studio 3.0中有下划线的奇怪错误

这是代码:

rxRssiRepository.onRssiUpdate() //returns Observable<RssiEvent>
        .timeout(10, TimeUnit.MILLISECONDS)
        .retryWhen { t: Observable<Throwable> ->
            t.flatMap { error: Throwable ->
                if (error is TimeoutException) {
                    stopLocationUpdates()
                    Log.v("TIMEOUT", "TIMEOUT RSSI EVENTS")
                    Observable.just(Observable.empty())
                } else {
                    Observable.error(error)
                }
            }
        }
        .concatMap { t: RssiEvent ->
            appendRssiEvent(t)
        }
        .publish()
Run Code Online (Sandbox Code Playgroud)

IDE在.flatMap运算符上用红色下划线表示:

类型推断失败:没有足够的信息来推断参数R in

有趣的flatMap(mapper:(((t:Throwable)?ObservableSource)!):可观察!请明确指定。

与“ if”运算符相同:

控制流表达式的类型推断失败。请明确指定其类型

对于操作员.just:

类型推断失败:没有足够的信息来有趣地推断参数T(项目:T!):可观察!请明确指定。

对于运算符.empty:

类型推断失败:没有足够的信息来推断参数T

有趣(项目:T!):可观察!请明确指定。

对于.error运算符:

类型推断失败:没有足够的信息来推断参数T

有趣的错误(异常:Throwable!):可观察!请明确指定。

如何解决这个问题?

我的意思是,这段代码也位于Java中: 如何添加超时以检测Observable一段时间未发出

但是当我将其转换为Kotlin时,它给了我所描述的情况问题

type-inference kotlin rx-java rx-java2

4
推荐指数
1
解决办法
3929
查看次数

如何使用RxJava TestObserver断言两个字符串列表?

在这个测试中,这两个字符串列表是相同的,但assertValue方法告诉我它们不相等,因为它们有不同的内存地址.如何使其针对数组中的实际字符串进行测试,以便以下内容通过?

@Test
public void shouldReturnStringList() {;
    String[] strings = {"Hello", "World"};
    String[] strings2 = {"Hello", "World"};
    Observable<String[]> stringsObservable = Observable.just(strings);

    TestObserver<String[]> testObserver = new TestObserver<>();
    stringsObservable.subscribe(testObserver);

    testObserver.assertValue(strings2);
}
Run Code Online (Sandbox Code Playgroud)

失败的测试消息:

java.lang.AssertionError:预期:[Ljava.lang.String; @ 5ebec15(class:String []),Actual:[Ljava.lang.String; @ 21bcffb5(class:String [])(latch = 0,values = 1,错误= 0,完成= 1)

java arrays unit-testing rx-java2

4
推荐指数
1
解决办法
724
查看次数

链API调用中的RxJava错误处理

我最近在玩Rxjava尝试实现一系列事件(Api callas /数据库操作),并且在处理错误时似乎遇到了障碍.

这就是我想要做的.我正在调用Api来检查数据库中是否存在用户.基于我得到的响应,我试图使用rxjava链接一些序列.下图可能会解释得更好.

                          checkUser()
                         /          \
                       No           Yes
                       /              \
            createUserRemote()       FetchUserNotesRemote()
                      |                    |
                    End               SaveUserNotesLocal()
                                            |
                                           End
Run Code Online (Sandbox Code Playgroud)

我能够使用以下代码将checkUser() - > FetchUserNotesRemote() - > SaveUserNotesLocal()序列链接在一​​起.

checkUser()
            .flatMap(id -> {return fetchData(id);})
            .flatMap(notesResponseObject -> {return saveFetchedData(notesResponseObject);})
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new SingleObserver<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onSuccess(Integer integer) {
                    //handle onsuccess here
                }

                @Override
                public void onError(Throwable e) {
                    //handle errors here
                }
            });
Run Code Online (Sandbox Code Playgroud)

我主要试图解决的问题.

  • 我无法弄清楚如何处理checkUser()返回
    404 http状态的情况.因为当发生这种情况时,
    调用订阅者的onError 方法,在我看来应该发生什么.我如何处理它,以便当我从API获得错误(404)响应,而不是执行FetchUserNotesRemote()和SaveUserNotesLocal()时,我执行不同的事件链?
  • 我不确定的另一件事是,如果链中的任何一个observable都有一个错误,那么订阅者的onError方法如何知道哪个observable称之为?

java android rx-java2

4
推荐指数
1
解决办法
2955
查看次数

在调用dispose()之前是否需要检查isDisposed()?

我正在学习RxJava,我注意到很多示例代码isDisposed()在调用之前都会进行检查dispose()。当我呼吁dispose()已经处置的时候,我没有发现任何问题Disposable

所以我的问题是,我需要isDisposed()支票吗?isDisposed()在处置前是否有应检查的情况?首先进行检查的利弊是什么?

android rx-java2

4
推荐指数
1
解决办法
1394
查看次数