我有休息api.
@Get("/serveraction")
public Observable<String> myRequest(@Query("Data") String data);
Run Code Online (Sandbox Code Playgroud)
我知道,okhttp已取消功能(通过请求对象,按标签),但不知道如何使用它与改造和rxjava.使用改造和rxjava实现网络任务取消机制的最佳方法是什么?
我在RxAndroid中使用RxJava和Android应用程序.我正在使用mergeDelayError将两个逆向拟合网络调用组合成一个observable,如果发出一个,它将处理发出的项目,如果有的话,则处理错误.这不起作用,它只会在遇到错误时触发onError操作.现在为了测试这个我转移到一个非常简单的例子,当我有一个onError调用时,仍然不会调用successAction.见下面的例子.
Observable.mergeDelayError(
Observable.error(new RuntimeException()),
Observable.just("Hello")
)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.finallyDo(completeAction)
.subscribe(successAction, errorAction);
Run Code Online (Sandbox Code Playgroud)
只有在使用两个成功的可观察对象时才会调用成功操作.我错过了mergeDelayError应该如何工作的东西?
编辑:
我发现,如果我删除了observeOn,subscribeOn一切都按预期工作.我需要指定线程和思想,这是使用Rx的重点.知道为什么指定那些Schedulers会破坏行为吗?
我正在从Rx 1迁移到Rx 2,突然在阅读帖子时我发现Single应该是用于改装调用的可观察类型.
所以我决定尝试一下,在将我们的改装调用迁移到Rx 2时,我也将返回值更改为Single<whatever>.
现在的问题是,我们的一些测试模拟网络服务类似于:
when(userService.logout()).thenReturn(Observable.empty())
Run Code Online (Sandbox Code Playgroud)
正如您在迁移调用之前所看到的那样,我们过去只是通过告诉userServicemock返回一个空的observable 来完成流.
在迁移到Single调用的"版本"时,我们不再使用,Observable.empty()因为调用不返回Observable,但返回a Single.
我最终做了类似的事情:
when(userService.logout()).thenReturn(
Single.fromObservable(Observable.<whatever>empty()))
Run Code Online (Sandbox Code Playgroud)
我的问题是:
RxJava v1.0.13引入了一种新的Observable:rx.Single.它非常适合请求 - 响应模型,但缺少引入doOnNext()等运算符的标准副作用.因此,结果会发生很多事情要困难得多.
我的想法是用同一个Single实例的多个订阅替换doOnNext().但这可能导致底层工作多次完成:每次订阅一次.
示例rx.Single实现:
private class WorkerSubscribe<SomeData>() : Single.OnSubscribe<SomeData> {
override fun call(sub: SingleSubscriber<in SomeData>) {
try {
val result = fetchSomeData()
sub.onSuccess(result)
} catch(t: Throwable) {
sub.onError(t)
}
}
}
val single = Single.create<SomeData>(WorkerSubscribe())
Run Code Online (Sandbox Code Playgroud)
用法:
single.subscribe({}, {})
single.subscribe({}, {}) // Data is fetched for the second time
Run Code Online (Sandbox Code Playgroud)
是否有可能创建一个单独的实例,即使多次调用single.subscribe(),也不会多次fetchSomeData(),但缓存并返回相同的结果?
我想做这个:
Observable.just(bitmap)
.map(new Func1<Bitmap, File>() {
@Override
public File call(Bitmap photoBitmap) {
//File creation throws IOException,
//I just want it to hit the onError() inside subscribe()
File photoFile = new File(App.getAppContext().getCacheDir(), "userprofilepic_temp.jpg");
if(photoFile.isFile()) {//delete the file first if it exists otherwise the new file won't be created
photoFile.delete();
}
photoFile.createNewFile(); //saves the file in the cache dir
FileOutputStream fos = new FileOutputStream(photoFile);
photoBitmap.compress(Bitmap.CompressFormat.JPEG, 90, fos);//jpeg format
fos.close();
return photoFile;
}
})
.subscribe(//continue implementation...);
Run Code Online (Sandbox Code Playgroud)
基本上在call()方法中,它可以抛出异常.如何让Observer处理它onError().或者这不是思考这个问题的正确方法吗?
我是RxJava的新手所以请原谅我,如果这听起来太新手了:-).
截至目前,我有一个抽象的CallbackClass,它实现了Retofit Callback.在那里我捕获了Callback的"onResponse"和"onError"方法,并在最终转发到自定义实现的方法之前处理各种错误类型.我还使用此集中式类来进行请求/响应应用程序日志记录和其他内容.
例如:对于来自我的服务器的特定错误代码,我在响应正文中收到一个新的Auth令牌,刷新令牌,然后clone.enqueue调用.当然,我的服务器的响应还有其他一些全局行为.
当前解决方案(无Rx):
public abstract void onResponse(Call<T> call, Response<T> response, boolean isSuccess);
public abstract void onFailure(Call<T> call, Response<T> response, Throwable t, boolean isTimeout);
@Override
public void onResponse(Call<T> call, Response<T> response) {
if (_isCanceled) return;
if (response != null && !response.isSuccessful()) {
if (response.code() == "SomeCode" && retryCount < RETRY_LIMIT) {
TokenResponseModel newToken = null;
try {
newToken = new Gson().fromJson(new String(response.errorBody().bytes(), "UTF-8"), TokenResponseModel.class);
} catch (Exception e) {
e.printStackTrace();
}
SomeClass.token = newToken.token;
retryCount++;
call.clone().enqueue(this);
return;
}
} …Run Code Online (Sandbox Code Playgroud) 我正在使用版本2.0.0的RxJava,似乎我无法访问AndroidSchedulers.我无法通过RxJava访问mainthread
我有一个房间持久数据库插入方法,如下所示:
@Dao
public interface CountriesDao{
@Insert(onConflict = REPLACE)
List<Long> addCountries(List<CountryModel> countryModel);
}
Run Code Online (Sandbox Code Playgroud)
我意识到这不能在主线程上运行.以下是我定义数据库的方法:
Room.inMemoryDatabaseBuilder(context.getApplicationContext(), MyDatabase.class).build();
Run Code Online (Sandbox Code Playgroud)
我试图使用rxjava2,以便我不在主线程上运行.我创建了以下方法:
public void storeCountries(List<CountryModel> countriesList) {
Observable.just(db.countriesDao().addCountries(countriesList))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new DefaultSubscriber<List<Long>>(){
@Override
public void onSubscribe(@NonNull Disposable d) {
super.onSubscribe(d);
}
@Override
public void onNext(@NonNull List<Long> longs) {
super.onNext(longs);
Timber.d("insert countries transaction complete");
}
@Override
public void onError(@NonNull Throwable e) {
super.onError(e);
Timber.d("error storing countries in db"+e);
}
@Override
public void onComplete() {
Timber.d("insert countries transaction complete");
}
});
}
Run Code Online (Sandbox Code Playgroud)
对我来说,这显然是在另一个线程上运行.不是主线程,但是当我运行此代码时,我收到以下错误:
完整的堆栈跟踪如下.为什么会这样?
进程:com.mobile.myapp.staging,PID:12990
java.lang.IllegalStateException:调度程序抛出致命异常.引起:java.lang.IllegalStateException:无法访问主线程上的数据库,因为它可能会长时间锁定UI.at io.reactivex.android.schedulers.HandlerScheduler $ …
我有一个BehaviorSubject我想要重置 - 我的意思是我希望最新值不可用,就像它刚刚创建一样.
我似乎没有看到一个API来做这个,但我想有另一种方法来实现相同的结果?
我希望的行为是我需要发出事件,并且我希望订阅者在他们订阅时获得最新事件 - 如果特定经理处于"已启动"状态.但是当这个经理被"停止"时,最新的事件应该不可用(就像它从未在第一时间开始一样).
我正在尝试测试以下RxKotlin/RxJava 2代码:
validate(data)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap { ... }
Run Code Online (Sandbox Code Playgroud)
我正在尝试覆盖调度程序,如下所示:
// Runs before each test suite
RxJavaPlugins.setInitIoSchedulerHandler { Schedulers.trampoline() }
RxAndroidPlugins.setInitMainThreadSchedulerHandler { Schedulers.trampoline() }
Run Code Online (Sandbox Code Playgroud)
但是,运行测试时出现以下错误:
java.lang.ExceptionInInitializerError
...
Caused by: java.lang.NullPointerException: Scheduler Callable result can't be null
at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
at io.reactivex.plugins.RxJavaPlugins.applyRequireNonNull(RxJavaPlugins.java:1317)
at io.reactivex.plugins.RxJavaPlugins.initIoScheduler(RxJavaPlugins.java:306)
at io.reactivex.schedulers.Schedulers.<clinit>(Schedulers.java:84)
Run Code Online (Sandbox Code Playgroud)
有没有人遇到过这个问题?
使用RxKotlin/RxJava 1和以下调度程序覆盖时,测试工作正常:
RxAndroidPlugins.getInstance().registerSchedulersHook(object : RxAndroidSchedulersHook() {
override fun getMainThreadScheduler() = Schedulers.immediate()
})
RxJavaPlugins.getInstance().registerSchedulersHook(object : RxJavaSchedulersHook() {
override fun getIOScheduler() = Schedulers.immediate()
})
Run Code Online (Sandbox Code Playgroud)
谢谢!