标签: rx-java

RxJava链接可观察量和错误处理(自定义异常传播)

我有5个Observable被链接的s flatMap.如果前四个Observables产生一个Exception我想传播不同类型Exception的第五个.

它是如何实现的?

谢谢.

PS我已经提出了这个尚未经过测试的解决方案:

flatMap(
        // onNext
        new Func1<BoolResponse, Observable<?>>() {
            @Override
            public Observable<?> call(BoolResponse boolResponse) {
                return request;
            }
        },
        // onError
        new Func1<Throwable, Observable<?>>() {
            @Override
            public Observable<?> call(Throwable throwable) {
                return Observable.error(new SomethingWentWrong());
            }
        },
        // onCompleted
        new Func0<Observable<?>>() {
            @Override
            public Observable<?> call() {
                return request;
            }
});
Run Code Online (Sandbox Code Playgroud)

你认为没关系吗?

rx-java

16
推荐指数
1
解决办法
1万
查看次数

RxJava在活动恢复后重新订阅事件

我还在弄清楚RxJava并使用它来与Retrofit 2做一些网络工作.我们已经尝试了几天了,而且现在这些代码看起来更具可读性但却遇到了一个我似乎无法想象的问题周围.

我正在尝试执行登录(返回API令牌),然后使用此令牌获取同一链中的所有初始数据,以便链的输出是令牌+数据.要做到这一点,我用一个调用我的API服务

apiClient
    .login()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap(token -> getData(token))
    .subscribe(new Subscrber<Bundle>() {...});
Run Code Online (Sandbox Code Playgroud)

这似乎很好,但我也想在启动和停止链条时显示进度条.所以我也添加了一个.doOnSubscribe()和一个.doOnUnsubscribe().但是我注意到在更改方向后,我试图隐藏进度条的片段始终为null.

所以我搜索并发现了RxLifecycle lib,看起来它会有所帮助,我现在.cache()并取消订阅事件链.但我不知道如何onCreate()在此之后再次订阅同一事件?我想我错过了一些非常基本的东西,并希望对此有任何帮助.

android android-lifecycle rx-java retrofit

16
推荐指数
1
解决办法
2314
查看次数

rxandroid要求在ui线程上运行,即使它是在AndroidSchedulers.mainThread()上订阅的

我写了一个订阅者,当谷歌地图被触发时会OnCameraChangeListener被触发.

Observable.create(new Observable.OnSubscribe<LatLng>()
    {
        @Override
        public void call(Subscriber<? super LatLng> subscriber)
        {
            if (!subscriber.isUnsubscribed())
            {
                mMap.setOnCameraChangeListener(cameraPosition ->
                        subscriber.onNext(cameraPosition.target));
            }
        }
    }).subscribeOn(AndroidSchedulers.mainThread())
            .observeOn(AndroidSchedulers.mainThread())
            .onErrorResumeNext(Observable.<LatLng>empty())
            .debounce(1, TimeUnit.SECONDS)
            .subscribe(position -> {
                if (position.latitude != 0 && position.longitude != 0)
                {
                    updateLocationMarker(position);
                }
            });
Run Code Online (Sandbox Code Playgroud)

我正在更新位置标记,如下所示:

private void updateLocationMarker(LatLng center)
{
    locationMarkertext.setText("Lat:" + center.latitude + " Long:" + center.longitude);
    //locationMarkerLayout.setVisibility(View.VISIBLE);
}
Run Code Online (Sandbox Code Playgroud)

即使我的代码说要运行AndroidSchedulers.mainThread()我也会收到此错误:

引发者:rx.exceptions.OnErrorNotImplementedException:只有创建视图层次结构的原始线程才能触及其视图.

有人可以帮我理解我的方法有什么问题

android rx-java

16
推荐指数
1
解决办法
4880
查看次数

测试RxBinding RxSearchView

背景

public Observable<List<Foo>> search(SearchView searchView) {

    return RxSearchView.queryTextChanges(searchView)
            .filter(charSequence -> !TextUtils.isEmpty(charSequence))
            .throttleLast(100, TimeUnit.MILLISECONDS)
            .debounce(200, TimeUnit.MILLISECONDS)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(AndroidSchedulers.mainThread())
            .flatMap(this::performSearch) //Search the DB
            .onErrorResumeNext(this::doSomething);
}
Run Code Online (Sandbox Code Playgroud)

我试图用AndroidJUnit4跑步者测试上面的方法Mocktio.

@Test
public void testSearchCallsDataManager_WhenCalled() {

    String input = "abc";

    when(mockSearchView.getQuery()).thenReturn(input);

    searchRequestManager.search(mockSearchView).subscribe(testSubscriber); //Using standard TestSubscriber

    testSubscriber.assertNoErrors();
    testSubscriber.assertNotCompleted();
    verify(mockDataManager).getFoos(input);
}
Run Code Online (Sandbox Code Playgroud)

问题

我尝试过使用a mockSearchView和real SearchView.

mockSearchView = mock(SearchView.class);
searchView = new SearchView(InstrumentationRegistry.getContext(), null);
searchView = new SearchView(InstrumentationRegistry.getTargetContext(), null);
Run Code Online (Sandbox Code Playgroud)

在实例化期间,实际对象在测试运行时会导致不同的异常.模拟对象在执行期间似乎没有任何效果.

更新

为清楚起见:理想情况下,如果我可以模拟SearchView,那将是很好的,因为我想测试发生什么后发生的事情以及使用正确的输入调用performSearch方法.

android android-appcompat searchview rx-java rx-android

16
推荐指数
1
解决办法
847
查看次数

RxJava计时器可以永久重复,并且可以随时重新启动和停止

在android中我使用Timer来执行每5秒重复一次的任务,并以1秒的方式以这种方式启动:

    Timer timer = new Timer();
    timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            // Here is the repeated task
        }
    }, /*Start after*/1000, /*Repeats every*/5000);

    // here i stop the timer
    timer.cancel();
Run Code Online (Sandbox Code Playgroud)

这个计时器将重复直到我打电话 timer.cancel()

我正在学习RxAava并使用RxAndroid扩展

所以我在互联网上找到了这个代码,我尝试了它并且它没有重复:

Observable.timer(3000, TimeUnit.MILLISECONDS)
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Long>() {
        @Override
        public void call(Long aLong) {
             // here is the task that should repeat
        }
    });
Run Code Online (Sandbox Code Playgroud)

那么什么是RxJava中的Android定时器的替代品.

android observer-pattern rx-java rx-android

16
推荐指数
2
解决办法
2万
查看次数

RXJava - 创建一个可暂停的可观察对象(例如缓冲区和窗口)

我想创建以下内容的observable:

  • 缓冲所有项目,同时暂停
  • 立即发出物品,而不是暂停
  • 暂停/恢复触发器必须来自另一个可观察者
  • 它必须保存以供不在主线程上运行的observable使用,并且必须保存以更改主线程中的暂停/恢复状态

我想使用BehaviorSubject<Boolean>as触发器并将此触发器绑定到activity onResumeonPause事件.(附加代码示例)

我已经设置了一些东西,但它没有按预期工作.我用它如下:

Observable o = ...;
// Variant 1
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue())
// Variant 2
// o = o.compose(RXPauser.applyPauser(getPauser()));
o
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe();
Run Code Online (Sandbox Code Playgroud)

目前的问题是,Variant 1应该可以正常工作,但有时,事件只是没有发出 - 阀门没有发射,直到阀门一切正常工作(可能是一个穿线问题......)!解决方案2更简单,似乎有效,但我不确定它是否真的更好,我不这么认为.我实际上不确定,为什么解决方案有时会失败,所以我不确定解决方案2是否解决了(目前对我不知道)问题...

有人可以告诉我可能是什么问题或者简单的解决方案应该可靠地工作吗?或者给我一个可靠的解决方案?

RxValue

https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08

RXPauser功能

public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser)
{
    return observable -> pauser(observable, pauser);
}

private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser)
{
    // this observable buffers all items that are emitted while …
Run Code Online (Sandbox Code Playgroud)

android observable rx-java

16
推荐指数
1
解决办法
958
查看次数

在后台运行void方法

我想使用rxjava在后台运行一个方法.我不关心结果.

void myHeavyMethod() { (...) }
Run Code Online (Sandbox Code Playgroud)

到目前为止,我唯一的解决方案是将返回类型修改为例如boolean.

boolean myHeavyMethod() { (...) return true; }
Run Code Online (Sandbox Code Playgroud)

然后我跑:

Completable.defer(() -> Completable.fromCallable(this::myHeavyMethod))
        .subscribeOn(Schedulers.computation())
        .subscribe(
                () -> {},
                throwable -> Log.e(TAG, throwable.getMessage(), throwable)
        );
Run Code Online (Sandbox Code Playgroud)

有没有办法保持void返回类型?

rx-java rx-java2

16
推荐指数
1
解决办法
5689
查看次数

如何使用Retrofit 2.0和新的MockRetrofit类来模拟服务?

使用Retrofit 1,我们用来模拟Web服务并模拟网络延迟,如下所示:

MockRestAdapter mockRestAdapter = MockRestAdapter.from(restAdapter);
return mockRestAdapter.create(MyService.class, new MyServiceMock());
Run Code Online (Sandbox Code Playgroud)

MyService服务接口在哪里(将响应作为Rx Observables返回),并且MyServiceMock是实现此接口的类.

在Retrofit 2.0.0-beta3中,有一个全新的模拟系统(参见:https://github.com/square/retrofit/pull/1343)尚未记录.当我尝试类似的东西,我得到:

MockRetrofit mockRetrofit = new MockRetrofit.Builder(retrofit).build();
BehaviorDelegate<AuthService> delegate = mockRetrofit.create(MyService.class);
Run Code Online (Sandbox Code Playgroud)

我该如何转接电话MyServiceMock

java rx-java retrofit2

15
推荐指数
1
解决办法
1万
查看次数

RxJava1和RxJava2之间的差异

我一直在寻找RxJava1 https://github.com/ReactiveX/RxJava/releases和RxJava2 https://github.com/ReactiveX/RxJava/wiki/Reactive-Streams的文档,看起来与RxJava 的独特之处在于2有Java Stream.

还有其他不同吗?

我一直在使用1.1.3版本,但我不确定是否值得将其转移到RxJava2,因为我们已经在代码中使用了Java 8流

问候.

java java-8 rx-java

15
推荐指数
2
解决办法
8123
查看次数

无法删除旧的javaCompile操作,可能是类名已更改

我正在学习RxJava.为此,我在RxJava上关注了droidcon谈话视频.教练为他正在使用的项目提供了回购链接.当我尝试在我的机器上构建项目时,我克隆了repo.我收到这个错误

错误:无法删除旧的javaCompile操作,可能是类名已更改?请提交错误报告,告知您正在使用的gradle版本.

这是gradle.build文件

    buildscript {
    repositories {
        mavenCentral()
    }

    dependencies {
        classpath 'me.tatarka:gradle-retrolambda:2.5.0'
    }
}

repositories {
    mavenCentral()
    maven { url "https://github.com/alter-ego/advanced-android-logger/raw/develop/releases/" }
}

apply plugin: 'retrolambda'
apply plugin: 'com.android.application'

android {
    compileSdkVersion 23
    buildToolsVersion "25.0.0"

    defaultConfig {
        applicationId "com.packtpub.apps.rxjava_essentials"
        minSdkVersion 16
        targetSdkVersion 22
        versionCode 1
        versionName "1.0"
        jackOptions {
            enabled true
        }
    }

    buildTypes {
        release {
            minifyEnabled false
            proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
        }
    }

    compileOptions {
        sourceCompatibility JavaVersion.VERSION_1_8
        targetCompatibility JavaVersion.VERSION_1_8
    }

    lintOptions {
        disable 'InvalidPackage' …
Run Code Online (Sandbox Code Playgroud)

java android gradle rx-java rx-android

15
推荐指数
2
解决办法
1158
查看次数