标签: rx-java

如何通过改造和rxjava取消任务

我有休息api.

@Get("/serveraction")
public Observable<String> myRequest(@Query("Data") String data);
Run Code Online (Sandbox Code Playgroud)

我知道,okhttp已取消功能(通过请求对象,按标签),但不知道如何使用它与改造和rxjava.使用改造和rxjava实现网络任务取消机制的最佳方法是什么?

reactive-programming rx-java retrofit okhttp

19
推荐指数
2
解决办法
9097
查看次数

Rx Java mergeDelayError无法按预期工作

我在RxAndroid中使用RxJava和Android应用程序.我正在使用mergeDelayError将两个逆向拟合网络调用组合成一个observable,如果发出一个,它将处理发出的项目,如果有的话,则处理错误.这不起作用,它只会在遇到错误时触发onError操作.现在为了测试这个我转移到一个非常简单的例子,当我有一个onError调用时,仍然不会调用successAction.见下面的例子.

Observable.mergeDelayError(
                Observable.error(new RuntimeException()),
                Observable.just("Hello")
            )
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .finallyDo(completeAction)
            .subscribe(successAction, errorAction);
Run Code Online (Sandbox Code Playgroud)

只有在使用两个成功的可观察对象时才会调用成功操作.我错过了mergeDelayError应该如何工作的东西?

编辑:

我发现,如果我删除了observeOn,subscribeOn一切都按预期工作.我需要指定线程和思想,这是使用Rx的重点.知道为什么指定那些Schedulers会破坏行为吗?

java android rx-java rx-android

19
推荐指数
2
解决办法
3873
查看次数

有没有类似Single.empty()的东西

我正在从Rx 1迁移到Rx 2,突然在阅读帖子时我发现Single应该是用于改装调用的可观察类型.

所以我决定尝试一下,在将我们的改装调用迁移到Rx 2时,我也将返回值更改为Single<whatever>.

现在的问题是,我们的一些测试模拟网络服务类似于:

when(userService.logout()).thenReturn(Observable.empty())
Run Code Online (Sandbox Code Playgroud)

正如您在迁移调用之前所看到的那样,我们过去只是通过告诉userServicemock返回一个空的observable 来完成流.

在迁移到Single调用的"版本"时,我们不再使用,Observable.empty()因为调用不返回Observable,但返回a Single.

我最终做了类似的事情:

when(userService.logout()).thenReturn(
                    Single.fromObservable(Observable.<whatever>empty()))
Run Code Online (Sandbox Code Playgroud)

我的问题是:

  1. 有没有更好的方法呢?
  2. 我错过了一些我应该知道的重要事情 - 这样的事情实际上并不像我期望的那样.

java rx-java rx-java2

19
推荐指数
2
解决办法
1万
查看次数

如何创建rx.Single的缓存/热门版本?

RxJava v1.0.13引入了一种新的Observable:rx.Single.它非常适合请求 - 响应模型,但缺少引入doOnNext()等运算符的标准副作用.因此,结果会发生很多事情要困难得多.

我的想法是用同一个Single实例的多个订阅替换doOnNext().但这可能导致底层工作多次完成:每次订阅一次.

示例rx.Single实现:

private class WorkerSubscribe<SomeData>() : Single.OnSubscribe<SomeData> {
    override fun call(sub: SingleSubscriber<in SomeData>) {
        try {
            val result = fetchSomeData()
            sub.onSuccess(result)
        } catch(t: Throwable) {
            sub.onError(t)
        }
    }
}

val single = Single.create<SomeData>(WorkerSubscribe())
Run Code Online (Sandbox Code Playgroud)

用法:

single.subscribe({}, {})
single.subscribe({}, {})   // Data is fetched for the second time
Run Code Online (Sandbox Code Playgroud)

是否有可能创建一个单独的实例,即使多次调用single.subscribe(),也不会多次fetchSomeData(),但缓存并返回相同的结果?

java reactive-programming kotlin rx-java

18
推荐指数
1
解决办法
3266
查看次数

如何在RxJava中的Observable中处理map()中的异常

我想做这个:

Observable.just(bitmap)
            .map(new Func1<Bitmap, File>() {
                @Override
                public File call(Bitmap photoBitmap) {

                    //File creation throws IOException, 
                    //I just want it to hit the onError() inside subscribe()

                    File photoFile = new File(App.getAppContext().getCacheDir(), "userprofilepic_temp.jpg");
                    if(photoFile.isFile()) {//delete the file first if it exists otherwise the new file won't be created
                        photoFile.delete();
                    }
                    photoFile.createNewFile(); //saves the file in the cache dir

                    FileOutputStream fos = new FileOutputStream(photoFile);
                    photoBitmap.compress(Bitmap.CompressFormat.JPEG, 90, fos);//jpeg format
                    fos.close();

                    return photoFile;

                }
            })
            .subscribe(//continue implementation...);
Run Code Online (Sandbox Code Playgroud)

基本上在call()方法中,它可以抛出异常.如何让Observer处理它onError().或者这不是思考这个问题的正确方法吗?

java android rx-java rx-android

18
推荐指数
2
解决办法
2万
查看次数

RxJava + Retrofit - >用于API调用的BaseObservable,用于集中响应处理

我是RxJava的新手所以请原谅我,如果这听起来太新手了:-).

截至目前,我有一个抽象的CallbackClass,它实现了Retofit Callback.在那里我捕获了Callback的"onResponse"和"onError"方法,并在最终转发到自定义实现的方法之前处理各种错误类型.我还使用此集中式类来进行请求/响应应用程序日志记录和其他内容.

例如:对于来自我的服务器的特定错误代码,我在响应正文中收到一个新的Auth令牌,刷新令牌,然后clone.enqueue调用.当然,我的服务器的响应还有其他一些全局行为.

当前解决方案(无Rx):

    public abstract void onResponse(Call<T> call, Response<T> response, boolean isSuccess);

    public abstract void onFailure(Call<T> call, Response<T> response, Throwable t, boolean isTimeout);

    @Override
    public void onResponse(Call<T> call, Response<T> response) {
        if (_isCanceled) return;

        if (response != null && !response.isSuccessful()) {
            if (response.code() == "SomeCode" && retryCount < RETRY_LIMIT) {
                TokenResponseModel newToken = null;
                try {
                    newToken = new Gson().fromJson(new String(response.errorBody().bytes(), "UTF-8"), TokenResponseModel.class);
                } catch (Exception e) {
                    e.printStackTrace();
                }

                    SomeClass.token = newToken.token;
                    retryCount++;
                    call.clone().enqueue(this);
                    return;
                }
            } …
Run Code Online (Sandbox Code Playgroud)

android rx-java rx-android retrofit2

18
推荐指数
1
解决办法
2816
查看次数

无法解析符号AndroidSchedulers

我正在使用版本2.0.0的RxJava,似乎我无法访问AndroidSchedulers.我无法通过RxJava访问mainthread

java android rx-java rx-java2

18
推荐指数
3
解决办法
1万
查看次数

Rxjava2只是方法 - 如何在另一个线程上运行房间插入?

我有一个房间持久数据库插入方法,如下所示:

@Dao
public interface CountriesDao{

    @Insert(onConflict = REPLACE)
    List<Long> addCountries(List<CountryModel> countryModel);
}
Run Code Online (Sandbox Code Playgroud)

我意识到这不能在主线程上运行.以下是我定义数据库的方法:

Room.inMemoryDatabaseBuilder(context.getApplicationContext(), MyDatabase.class).build();
Run Code Online (Sandbox Code Playgroud)

我试图使用rxjava2,以便我不在主线程上运行.我创建了以下方法:

public void storeCountries(List<CountryModel> countriesList) {
        Observable.just(db.countriesDao().addCountries(countriesList))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new DefaultSubscriber<List<Long>>(){
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                super.onSubscribe(d);
            }

            @Override
            public void onNext(@NonNull List<Long> longs) {
                super.onNext(longs);
                Timber.d("insert countries transaction complete");
            }

            @Override
            public void onError(@NonNull Throwable e) {
                super.onError(e);
                Timber.d("error storing countries in db"+e);
            }

            @Override
            public void onComplete() {
                Timber.d("insert countries transaction complete");
            }
        });
    }
Run Code Online (Sandbox Code Playgroud)

对我来说,这显然是在另一个线程上运行.不是主线程,但是当我运行此代码时,我收到以下错误:

完整的堆栈跟踪如下.为什么会这样?

进程:com.mobile.myapp.staging,PID:12990
java.lang.IllegalStateException:调度程序抛出致命异常.引起:java.lang.IllegalStateException:无法访问主线程上的数据库,因为它可能会长时间锁定UI.at io.reactivex.android.schedulers.HandlerScheduler $ …

android rx-java android-room

18
推荐指数
4
解决办法
1万
查看次数

如何重置BehaviorSubject

我有一个BehaviorSubject我想要重置 - 我的意思是我希望最新值不可用,就像它刚刚创建一样.

我似乎没有看到一个API来做这个,但我想有另一种方法来实现相同的结果?

我希望的行为是我需要发出事件,并且我希望订阅者在他们订阅时获得最新事件 - 如果特定经理处于"已启动"状态.但是当这个经理被"停止"时,最新的事件应该不可用(就像它从未在第一时间开始一样).

stream reactive-programming rx-java rx-java2

18
推荐指数
2
解决办法
2万
查看次数

RxJava 2在单元测试中覆盖IO调度程序

我正在尝试测试以下RxKotlin/RxJava 2代码:

validate(data)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap { ... }
Run Code Online (Sandbox Code Playgroud)

我正在尝试覆盖调度程序,如下所示:

// Runs before each test suite
RxJavaPlugins.setInitIoSchedulerHandler { Schedulers.trampoline() }
RxAndroidPlugins.setInitMainThreadSchedulerHandler { Schedulers.trampoline() }
Run Code Online (Sandbox Code Playgroud)

但是,运行测试时出现以下错误:

java.lang.ExceptionInInitializerError
...
Caused by: java.lang.NullPointerException: Scheduler Callable result can't be null
    at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
    at io.reactivex.plugins.RxJavaPlugins.applyRequireNonNull(RxJavaPlugins.java:1317)
    at io.reactivex.plugins.RxJavaPlugins.initIoScheduler(RxJavaPlugins.java:306)
    at io.reactivex.schedulers.Schedulers.<clinit>(Schedulers.java:84)
Run Code Online (Sandbox Code Playgroud)

有没有人遇到过这个问题?


使用RxKotlin/RxJava 1和以下调度程序覆盖时,测试工作正常:

RxAndroidPlugins.getInstance().registerSchedulersHook(object : RxAndroidSchedulersHook() {
    override fun getMainThreadScheduler() = Schedulers.immediate()
})

RxJavaPlugins.getInstance().registerSchedulersHook(object : RxJavaSchedulersHook() {
    override fun getIOScheduler() = Schedulers.immediate()
})
Run Code Online (Sandbox Code Playgroud)

谢谢!

kotlin rx-java rx-kotlin

17
推荐指数
4
解决办法
6257
查看次数