我有5个Observable被链接的s flatMap.如果前四个Observables产生一个Exception我想传播不同类型Exception的第五个.
它是如何实现的?
谢谢.
PS我已经提出了这个尚未经过测试的解决方案:
flatMap(
// onNext
new Func1<BoolResponse, Observable<?>>() {
@Override
public Observable<?> call(BoolResponse boolResponse) {
return request;
}
},
// onError
new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
return Observable.error(new SomethingWentWrong());
}
},
// onCompleted
new Func0<Observable<?>>() {
@Override
public Observable<?> call() {
return request;
}
});
Run Code Online (Sandbox Code Playgroud)
你认为没关系吗?
我还在弄清楚RxJava并使用它来与Retrofit 2做一些网络工作.我们已经尝试了几天了,而且现在这些代码看起来更具可读性但却遇到了一个我似乎无法想象的问题周围.
我正在尝试执行登录(返回API令牌),然后使用此令牌获取同一链中的所有初始数据,以便链的输出是令牌+数据.要做到这一点,我用一个调用我的API服务
apiClient
.login()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(token -> getData(token))
.subscribe(new Subscrber<Bundle>() {...});
Run Code Online (Sandbox Code Playgroud)
这似乎很好,但我也想在启动和停止链条时显示进度条.所以我也添加了一个.doOnSubscribe()和一个.doOnUnsubscribe().但是我注意到在更改方向后,我试图隐藏进度条的片段始终为null.
所以我搜索并发现了RxLifecycle lib,看起来它会有所帮助,我现在.cache()并取消订阅事件链.但我不知道如何onCreate()在此之后再次订阅同一事件?我想我错过了一些非常基本的东西,并希望对此有任何帮助.
我写了一个订阅者,当谷歌地图被触发时会OnCameraChangeListener被触发.
Observable.create(new Observable.OnSubscribe<LatLng>()
{
@Override
public void call(Subscriber<? super LatLng> subscriber)
{
if (!subscriber.isUnsubscribed())
{
mMap.setOnCameraChangeListener(cameraPosition ->
subscriber.onNext(cameraPosition.target));
}
}
}).subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.onErrorResumeNext(Observable.<LatLng>empty())
.debounce(1, TimeUnit.SECONDS)
.subscribe(position -> {
if (position.latitude != 0 && position.longitude != 0)
{
updateLocationMarker(position);
}
});
Run Code Online (Sandbox Code Playgroud)
我正在更新位置标记,如下所示:
private void updateLocationMarker(LatLng center)
{
locationMarkertext.setText("Lat:" + center.latitude + " Long:" + center.longitude);
//locationMarkerLayout.setVisibility(View.VISIBLE);
}
Run Code Online (Sandbox Code Playgroud)
即使我的代码说要运行AndroidSchedulers.mainThread()我也会收到此错误:
引发者:rx.exceptions.OnErrorNotImplementedException:只有创建视图层次结构的原始线程才能触及其视图.
有人可以帮我理解我的方法有什么问题
背景
public Observable<List<Foo>> search(SearchView searchView) {
return RxSearchView.queryTextChanges(searchView)
.filter(charSequence -> !TextUtils.isEmpty(charSequence))
.throttleLast(100, TimeUnit.MILLISECONDS)
.debounce(200, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.mainThread())
.flatMap(this::performSearch) //Search the DB
.onErrorResumeNext(this::doSomething);
}
Run Code Online (Sandbox Code Playgroud)
我试图用AndroidJUnit4跑步者测试上面的方法Mocktio.
@Test
public void testSearchCallsDataManager_WhenCalled() {
String input = "abc";
when(mockSearchView.getQuery()).thenReturn(input);
searchRequestManager.search(mockSearchView).subscribe(testSubscriber); //Using standard TestSubscriber
testSubscriber.assertNoErrors();
testSubscriber.assertNotCompleted();
verify(mockDataManager).getFoos(input);
}
Run Code Online (Sandbox Code Playgroud)
问题
我尝试过使用a mockSearchView和real SearchView.
mockSearchView = mock(SearchView.class);
searchView = new SearchView(InstrumentationRegistry.getContext(), null);
searchView = new SearchView(InstrumentationRegistry.getTargetContext(), null);
Run Code Online (Sandbox Code Playgroud)
在实例化期间,实际对象在测试运行时会导致不同的异常.模拟对象在执行期间似乎没有任何效果.
更新
为清楚起见:理想情况下,如果我可以模拟SearchView,那将是很好的,因为我想测试发生什么后发生的事情以及使用正确的输入调用performSearch方法.
在android中我使用Timer来执行每5秒重复一次的任务,并以1秒的方式以这种方式启动:
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
// Here is the repeated task
}
}, /*Start after*/1000, /*Repeats every*/5000);
// here i stop the timer
timer.cancel();
Run Code Online (Sandbox Code Playgroud)
这个计时器将重复直到我打电话 timer.cancel()
我正在学习RxAava并使用RxAndroid扩展
所以我在互联网上找到了这个代码,我尝试了它并且它没有重复:
Observable.timer(3000, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
// here is the task that should repeat
}
});
Run Code Online (Sandbox Code Playgroud)
那么什么是RxJava中的Android定时器的替代品.
我想创建以下内容的observable:
我想使用BehaviorSubject<Boolean>as触发器并将此触发器绑定到activity onResume和onPause事件.(附加代码示例)
题
我已经设置了一些东西,但它没有按预期工作.我用它如下:
Observable o = ...;
// Variant 1
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue())
// Variant 2
// o = o.compose(RXPauser.applyPauser(getPauser()));
o
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
Run Code Online (Sandbox Code Playgroud)
目前的问题是,Variant 1应该可以正常工作,但有时,事件只是没有发出 - 阀门没有发射,直到阀门一切正常工作(可能是一个穿线问题......)!解决方案2更简单,似乎有效,但我不确定它是否真的更好,我不这么认为.我实际上不确定,为什么解决方案有时会失败,所以我不确定解决方案2是否解决了(目前对我不知道)问题...
有人可以告诉我可能是什么问题或者简单的解决方案应该可靠地工作吗?或者给我一个可靠的解决方案?
码
RxValue
https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08
RXPauser功能
public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser)
{
return observable -> pauser(observable, pauser);
}
private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser)
{
// this observable buffers all items that are emitted while …Run Code Online (Sandbox Code Playgroud) 我想使用rxjava在后台运行一个方法.我不关心结果.
void myHeavyMethod() { (...) }
Run Code Online (Sandbox Code Playgroud)
到目前为止,我唯一的解决方案是将返回类型修改为例如boolean.
boolean myHeavyMethod() { (...) return true; }
Run Code Online (Sandbox Code Playgroud)
然后我跑:
Completable.defer(() -> Completable.fromCallable(this::myHeavyMethod))
.subscribeOn(Schedulers.computation())
.subscribe(
() -> {},
throwable -> Log.e(TAG, throwable.getMessage(), throwable)
);
Run Code Online (Sandbox Code Playgroud)
有没有办法保持void返回类型?
使用Retrofit 1,我们用来模拟Web服务并模拟网络延迟,如下所示:
MockRestAdapter mockRestAdapter = MockRestAdapter.from(restAdapter);
return mockRestAdapter.create(MyService.class, new MyServiceMock());
Run Code Online (Sandbox Code Playgroud)
MyService服务接口在哪里(将响应作为Rx Observables返回),并且MyServiceMock是实现此接口的类.
在Retrofit 2.0.0-beta3中,有一个全新的模拟系统(参见:https://github.com/square/retrofit/pull/1343)尚未记录.当我尝试类似的东西,我得到:
MockRetrofit mockRetrofit = new MockRetrofit.Builder(retrofit).build();
BehaviorDelegate<AuthService> delegate = mockRetrofit.create(MyService.class);
Run Code Online (Sandbox Code Playgroud)
我该如何转接电话MyServiceMock?
我一直在寻找RxJava1 https://github.com/ReactiveX/RxJava/releases和RxJava2 https://github.com/ReactiveX/RxJava/wiki/Reactive-Streams的文档,看起来与RxJava 的独特之处在于2有Java Stream.
还有其他不同吗?
我一直在使用1.1.3版本,但我不确定是否值得将其转移到RxJava2,因为我们已经在代码中使用了Java 8流
问候.
我正在学习RxJava.为此,我在RxJava上关注了droidcon谈话视频.教练为他正在使用的项目提供了回购链接.当我尝试在我的机器上构建项目时,我克隆了repo.我收到这个错误
错误:无法删除旧的javaCompile操作,可能是类名已更改?请提交错误报告,告知您正在使用的gradle版本.
这是gradle.build文件
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath 'me.tatarka:gradle-retrolambda:2.5.0'
}
}
repositories {
mavenCentral()
maven { url "https://github.com/alter-ego/advanced-android-logger/raw/develop/releases/" }
}
apply plugin: 'retrolambda'
apply plugin: 'com.android.application'
android {
compileSdkVersion 23
buildToolsVersion "25.0.0"
defaultConfig {
applicationId "com.packtpub.apps.rxjava_essentials"
minSdkVersion 16
targetSdkVersion 22
versionCode 1
versionName "1.0"
jackOptions {
enabled true
}
}
buildTypes {
release {
minifyEnabled false
proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
}
}
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
lintOptions {
disable 'InvalidPackage' …Run Code Online (Sandbox Code Playgroud) rx-java ×10
android ×6
java ×3
rx-android ×3
gradle ×1
java-8 ×1
observable ×1
retrofit ×1
retrofit2 ×1
rx-java2 ×1
searchview ×1