如果Observable
在完成之前没有发出一个值,我想"抛出"自定义错误.
让我试着举个例子:
Observable<SomeClass> stream = ...
stream
.filter(...)
.singleOrError(new MyCustomException())
Run Code Online (Sandbox Code Playgroud)
所以我有一个SomeClass对象流.如果fitler()
不发出一个元素,我想发出自定义错误.
I wanted to use RxJava but can't come up with alternative for method
public final Observable<T> first(Func1<? super T,java.lang.Boolean> predicate)
Run Code Online (Sandbox Code Playgroud)
in RxJava2.
What I want to do is following:
return io.reactivex.Observable
.concat(source1, source2, source3, source4)
.first(obj -> obj != null);
Run Code Online (Sandbox Code Playgroud)
Parameters source1 to source4 are io.reactivex.Observable
instances that I concatenate and I want resulting Observable to emit only the very first item that isn't null but this of course fails because io.reactivex.Observable
doesn't have method first(Func1 predicate)
like rx.Observable
. …
为什么Flowables不能观察到; 可观察接口几乎是Flowable的一个子集,它们的实现几乎相同.
为什么它们不实现通用接口,所以我们可以直接将Flowable转换为Observable?
我在rx世界中介绍,我想了解为什么在订阅newThread时阻塞不起作用.例如:
这是有效的:
int i = Observable.fromArray(1,2,3,4).blockingFirst();
Run Code Online (Sandbox Code Playgroud)
这不起作用:
int i = Observable.just(1,2,3,4)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread()).blockingFirst();
Run Code Online (Sandbox Code Playgroud)
如果有可能使第二种情况起作用.
谢谢 ;)
我读过很多关于publish()和replay()的rx例子.在所有教程中,他们都说在调用replay()之前调用publish()很重要.为什么会这样?如果你调用replay()并且从不调用publish()会发生什么?似乎replay()运算符会订阅源observable并开始缓存.然后任何人都会获得相同的完整数据流.我可以看到publish()改变游戏的唯一原因是它会延迟订阅源observable,直到调用connect().这是正确的吗?
我有一个示例API请求,它返回用户监视列表的列表.我想在用户加载监视列表屏幕时实现以下流程:
立即从DB缓存加载数据.(cacheWatchList
)
在后台启动RetroFit网络呼叫.
一世.onSuccess
返回apiWatchList
ii.onError
返回cacheWatchList
Diff cacheWatchList
vs.apiWatchList
一世.相同 - >一切都很好,因为数据已经显示给用户什么都不做.
II.不同 - >保存apiWatchList
到本地商店并发apiWatchList
送到下游.
到目前为止我做了什么?
Watchlist.kt
data class Watchlist(
val items: List<Repository> = emptyList()
)
Run Code Online (Sandbox Code Playgroud)
LocalStore.kt(Android室)
fun saveUserWatchlist(repositories: List<Repository>): Completable {
return Completable.fromCallable {
watchlistDao.saveAllUserWatchlist(*repositories.toTypedArray())
}
}
Run Code Online (Sandbox Code Playgroud)
RemoteStore.kt(改造api调用)
fun getWatchlist(userId: UUID): Single<Watchlist?> {
return api.getWatchlist(userId)
}
Run Code Online (Sandbox Code Playgroud)
DataManager.kt
fun getWatchlist(userId: UUID): Flowable<List<Repository>?> {
val localSource: Single<List<Repository>?> =
localStore.getUserWatchlist()
.subscribeOn(scheduler.computation)
val remoteSource: Single<List<Repository>> …
Run Code Online (Sandbox Code Playgroud) 我正在尝试在发生计时器的情况下在Observable上实现retryWhen,但是在IDE Android Studio 3.0中有下划线的奇怪错误
这是代码:
rxRssiRepository.onRssiUpdate() //returns Observable<RssiEvent>
.timeout(10, TimeUnit.MILLISECONDS)
.retryWhen { t: Observable<Throwable> ->
t.flatMap { error: Throwable ->
if (error is TimeoutException) {
stopLocationUpdates()
Log.v("TIMEOUT", "TIMEOUT RSSI EVENTS")
Observable.just(Observable.empty())
} else {
Observable.error(error)
}
}
}
.concatMap { t: RssiEvent ->
appendRssiEvent(t)
}
.publish()
Run Code Online (Sandbox Code Playgroud)
IDE在.flatMap运算符上用红色下划线表示:
类型推断失败:没有足够的信息来推断参数R in
有趣的flatMap(mapper:(((t:Throwable)?ObservableSource)!):可观察!请明确指定。
与“ if”运算符相同:
控制流表达式的类型推断失败。请明确指定其类型
对于操作员.just:
类型推断失败:没有足够的信息来有趣地推断参数T(项目:T!):可观察!请明确指定。
对于运算符.empty:
类型推断失败:没有足够的信息来推断参数T
有趣(项目:T!):可观察!请明确指定。
对于.error运算符:
类型推断失败:没有足够的信息来推断参数T
有趣的错误(异常:Throwable!):可观察!请明确指定。
如何解决这个问题?
我的意思是,这段代码也位于Java中: 如何添加超时以检测Observable一段时间未发出
但是当我将其转换为Kotlin时,它给了我所描述的情况问题
在这个测试中,这两个字符串列表是相同的,但assertValue方法告诉我它们不相等,因为它们有不同的内存地址.如何使其针对数组中的实际字符串进行测试,以便以下内容通过?
@Test
public void shouldReturnStringList() {;
String[] strings = {"Hello", "World"};
String[] strings2 = {"Hello", "World"};
Observable<String[]> stringsObservable = Observable.just(strings);
TestObserver<String[]> testObserver = new TestObserver<>();
stringsObservable.subscribe(testObserver);
testObserver.assertValue(strings2);
}
Run Code Online (Sandbox Code Playgroud)
失败的测试消息:
java.lang.AssertionError:预期:[Ljava.lang.String; @ 5ebec15(class:String []),Actual:[Ljava.lang.String; @ 21bcffb5(class:String [])(latch = 0,values = 1,错误= 0,完成= 1)
我最近在玩Rxjava尝试实现一系列事件(Api callas /数据库操作),并且在处理错误时似乎遇到了障碍.
这就是我想要做的.我正在调用Api来检查数据库中是否存在用户.基于我得到的响应,我试图使用rxjava链接一些序列.下图可能会解释得更好.
checkUser()
/ \
No Yes
/ \
createUserRemote() FetchUserNotesRemote()
| |
End SaveUserNotesLocal()
|
End
Run Code Online (Sandbox Code Playgroud)
我能够使用以下代码将checkUser() - > FetchUserNotesRemote() - > SaveUserNotesLocal()序列链接在一起.
checkUser()
.flatMap(id -> {return fetchData(id);})
.flatMap(notesResponseObject -> {return saveFetchedData(notesResponseObject);})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Integer integer) {
//handle onsuccess here
}
@Override
public void onError(Throwable e) {
//handle errors here
}
});
Run Code Online (Sandbox Code Playgroud)
我主要试图解决的问题.
我正在学习RxJava,我注意到很多示例代码isDisposed()
在调用之前都会进行检查dispose()
。当我呼吁dispose()
已经处置的时候,我没有发现任何问题Disposable
。
所以我的问题是,我需要isDisposed()
支票吗?isDisposed()
在处置前是否有应检查的情况?首先进行检查的利弊是什么?
rx-java2 ×10
rx-java ×6
android ×3
java ×3
reactivex ×2
rx-android ×2
android-room ×1
arrays ×1
exception ×1
kotlin ×1
reactive ×1
retrofit2 ×1
unit-testing ×1