自从我集成 RxJava2 以来,我在返回Observable 的所有改造调用中收到 401 未经身份验证的错误,我正在使用基本身份验证,并且我知道错误是由于它造成的,但为什么它在调试时有效,但在发布时无效。
在我看来,retrofit2 的 rxjava 适配器的配置有问题
com.jakewharton.retrofit2.adapter.rxjava2.HttpException: HTTP 401 Unauthorized
01-22 19:24:14.872 11502-11502/? W/System.err: at com.jakewharton.retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:54)
01-22 19:24:14.872 11502-11502/? W/System.err: at com.jakewharton.retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:37)
01-22 19:24:14.872 11502-11502/? W/System.err: at com.jakewharton.retrofit2.adapter.rxjava2.CallObservable.subscribeActual(CallObservable.java:43)
01-22 19:24:14.872 11502-11502/? W/System.err: at io.reactivex.Observable.subscribe(Observable.java:10514)
01-22 19:24:14.872 11502-11502/? W/System.err: at com.jakewharton.retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
01-22 19:24:14.872 11502-11502/? W/System.err: at io.reactivex.Observable.subscribe(Observable.java:10514)
01-22 19:24:14.872 11502-11502/? W/System.err: at io.reactivex.internal.operators.observable.ObservableDoOnLifecycle.subscribeActual(ObservableDoOnLifecycle.java:33)
01-22 19:24:14.872 11502-11502/? W/System.err: at io.reactivex.Observable.subscribe(Observable.java:10514)
01-22 19:24:14.872 11502-11502/? W/System.err: at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
01-22 19:24:14.872 11502-11502/? W/System.err: at io.reactivex.Observable.subscribe(Observable.java:10514)
01-22 19:24:14.872 11502-11502/? …
Run Code Online (Sandbox Code Playgroud) 我一直在研究Reactive Programming with RxJava一书中的例子,它针对版本 1 而不是 2。无限流的介绍有以下示例(并注意有更好的方法来处理并发):
Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> {
Runnabler = () -> {
BigInteger i = ZERO;
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(i);
i = i.add(ONE);
}
};
new Thread(r).start();
});
...
Subscription subscription = naturalNumbers.subscribe(x -> log(x));
/* after some time... */
subscription.unsubscribe();
Run Code Online (Sandbox Code Playgroud)
但是,在 RxJava 2 中,传递给create()
方法的 lambda 表达式是类型的,ObservableEmitter
并且它没有isUnsubscribed()
方法。我查看了2.0 中的不同之处,还搜索了存储库,但找不到任何此类方法。
如何在 2.0 中实现相同的功能?
编辑以包含如下给出的解决方案(nb 使用 kotlin):
val naturalNumbers = Observable.create<BigInteger> { emitter ->
Thread({
var int: …
Run Code Online (Sandbox Code Playgroud) 我正在使用 RxJava 2.x,并且有 3 个 observables(如果重要的话,特别是发布主题)。
我喜欢将它们全部运行一次,并获得一次结果。我正在使用Observable.zip()
运算符进行此类过程。但是看起来 Zip 运算符不支持超过 2 个 observables。
是否有其他运算符可以像 zip 一样组合 3 个以上的观察值?
Observable.zip(
getData(),
getOtherData(),
getTemplate(),
(o1,o2,o3)->{
});
Run Code Online (Sandbox Code Playgroud) 我的 API 客户端
public class ApiClient {
public static final String BASE_URL = "http://baseurl.com/wp-json/";
private static Retrofit retrofit = null;
public static Retrofit getClient() {
if (retrofit==null) {
retrofit = new Retrofit.Builder()
.baseUrl(BASE_URL)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build();
}
return retrofit;
}
}
Run Code Online (Sandbox Code Playgroud)
我的端点接口
public interface ApiInterface {
@GET("posts/category/politics/recent-stories/")
Observable<ArrayList<ModelNewsPaged>> getPoliticsArrayListObservable(@Query("per_page") int perPage, @Query("page") int page);
@GET("posts/category/economy/recent-stories/")
Observable<ArrayList<ModelNewsPaged>> getEconomyArrayListObservable(@Query("per_page") int perPage, @Query("page") int page);
@GET("posts/category/sports/recent-stories/")
Observable<ArrayList<ModelNewsPaged>> getSportsArrayListObservable(@Query("per_page") int perPage, @Query("page") int page);
}
Run Code Online (Sandbox Code Playgroud)
如何在不使用 lambda 函数的情况下使用 flatMap 依次调用 getPoliticsArrayListObservable 和 getEconomyArrayListObservable 和 …
Observable.create(new ObservableOnSubscribe())
.subscribeOn(Schedulers.io())
.compose(new ParserTransformer())
.map()
.subscribe()
Run Code Online (Sandbox Code Playgroud)
当我的 ObservableOnSubscribe 出现问题时,它会抛出异常(java.net.UnknownHostException ...)。
可观察订阅:
public void subscribe(ObservableEmitter oe) throws Exception {
try{
...
} catch {
if (!oe.isDisposed()) {
if (ex instanceof IOException) {
throw new NetException(...); //sometimes, it make the app crash!
}
} else{
...
}
}
oe.onComplete();
}
Run Code Online (Sandbox Code Playgroud)
这是日志:
D OkHttp: error: java.net.UnknownHostException
D OkHttp: oe.isDisposed: false
W System.err: io.reactivex.exceptions.OnErrorNotImplementedException: Unable to resolve host ******: No address associated with hostname
W System.err: at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
W System.err: at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
W …
Run Code Online (Sandbox Code Playgroud) 我使用 mergeLatest() 来组合 3 个可观察数据流。所有这些组合在一起,以便同时显示 UI 中的所有数据。现在,有一种情况,其中一个可观察量不会发出任何内容,因为获取的数据可能为空。
是否有 RxJava 运算符让订阅者知道不会因为空数据而发出任何信号?
编辑
private fun retrieveData() {
Observable.combineLatest(getCurrentUser.execute(), getLatestGoal.execute(), getLatestLog.execute(),
Function3<User, Goal, Log, PersonalViewModel> { user, goal, log -> mapToViewModel(user, goal, log) })
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe { /*todo: animation*/ }
.doOnNext { view.setViewModel(it) }
.doOnComplete { view.stopLoading() }
.doOnError { /*todo: error message*/ }
.subscribe()
}
Run Code Online (Sandbox Code Playgroud)
第三个流:当用户有 nog 日志时,getLatestLog.execute() 不发出任何内容。当该流不发出时,整个视图将不可见。
数据是从 FireBase 实时数据库中获取的。ChildEventListener 有一个如下所示的方法:
override fun onChildAdded(dataSnapshot: DataSnapshot?, p1: String?) {
val log = dataSnapshot?.getValue(Log::class.java)
log?.let { subscriber.onNext(it) }
subscriber.onComplete()
firebaseDatabase.reference.removeEventListener(this)
}
Run Code Online (Sandbox Code Playgroud) 我的目标(也是问题)是做,比如说,集中的错误处理。在大多数情况下,每个 API 端点的错误将以相同的方式处理,所以我不想有重复或大量if else
语句。
我的应用程序的架构对应于developer.android.com 中描述的架构
所以,这意味着我应该将错误从repo
via传递viewModel
到UI layer (Activity/Fragment)
,以便从该层进行 UI 更改。
我的代码中的一些小部分:
myService.initiateLogin("Basic " + base64, authBody)
.enqueue(new Callback<UserTokenModel>() {
@Override
public void onResponse(Call<UserTokenModel> call, Response<UserTokenModel> response) {
userTokenModelMutableLiveData.setValue(response.body());
}
@Override
public void onFailure(Call<UserTokenModel> call, Throwable t) {
// TODO better error handling in feature ...
userTokenModelMutableLiveData.setValue(null);
}
});
Run Code Online (Sandbox Code Playgroud)
比方说,我们需要表现出吐司每一个onFailure(...)
方法调用时或errorBody
不会null
在onResponse(...)
每一个API调用方法。
那么,在保持架构不变的同时进行“集中式”错误处理的建议是什么?
Let's say you have a MVVM app with a UI layer, a ViewModel, and a Repository. Say that in your repository, you're getting some data from an API with Single
Retrofit calls, and transforming it into a UI-ready viewstate object.
The way I see it, you have two main choices (assuming you want to use LiveData in the UI layer, I am not including the option of observing Rx types from the UI):
Expose your Rx Observable
from the repository, …
鉴于 Hystrix 进入维护模式,我一直致力于将(相当大的)代码库迁移到 Resilience4j。
我在 Hystrix 中大量使用以下模式:
new HystrixCommand<SomeReturnValue>(DependencyKeys.DEPENDENCY) {
@Override
protected SomeReturnValue run() {
return someExpensiveCall();
}
}
.observe()
Run Code Online (Sandbox Code Playgroud)
我想用 Resilience4j 复制 Hystrix 的一些功能。
到目前为止,我有以下语法来连接外部调用:
resilience.single(DependencyKeys.DEPENDENCY, this::someExpensiveCall);
Run Code Online (Sandbox Code Playgroud)
其中Resilience
类提供single
方法:
public <T> Single<T> single(ResilienceKey key, Callable<T> callable) {
return Completable.complete()
.subscribeOn(Schedulers.computation())
.observeOn(configuration.scheduler(key))
.andThen(Single.defer(() -> Single.fromCallable(callable)
.lift(CircuitBreakerOperator.of(configuration.circuitBreaker(key)))
.lift(RateLimiterOperator.of(configuration.rateLimiter(key)))
.lift(BulkheadOperator.of(configuration.bulkhead(key)))
))
.observeOn(Schedulers.computation());
}
Run Code Online (Sandbox Code Playgroud)
在断路和在不同线程池上运行代码方面,这看起来如何更好地类似于 Hystrix,但以更理智的方式。我真的不喜欢用 just 来启动链,这样我就可以在实际的可调用对象被包装之前Completable.complete()
强制使用 a 。observeOn
我从分页 2 迁移到分页 3。我尝试将分页 2 的 ItemKeyedDataSource 实现到分页库 3。但我面临的问题是,在加载的两个连续页面中,相同的值(currentJodId)作为 nextkey 传递。在该应用程序崩溃之后。但是如果我在数据源中添加“keyReuseSupported = true”,应用程序不会崩溃。但它开始调用与 nextkey 相同的项目 ID。
JobSliderRestApi.kt
@GET("job/list/slides")
fun getDetailOfSelectedJob(
@Query("current_job") currentJodId: Int?,
@Query("limit") jobLimit: Int?,
@Query("search_in") fetchType: String?
): Single<Response<JobViewResponse>>
Run Code Online (Sandbox Code Playgroud)
JobViewResponse.kt
data class JobViewResponse(
@SerializedName("data") val data: ArrayList<JobDetail>?
) : BaseResponse()
Run Code Online (Sandbox Code Playgroud)
JobDetail.kt
data class JobDetail(
@SerializedName("job_id") val jobId: Int,
@SerializedName("tuition_type") val jobType: String?,
@SerializedName("class_image") val jobImage: String,
@SerializedName("salary") val salary: String,
@SerializedName("no_of_student") val noOfStudent: Int,
@SerializedName("student_gender") val studentGender: String,
@SerializedName("tutor_gender") val preferredTutor: String,
@SerializedName("days_per_week") val daysPerWeek: String?,
@SerializedName("other_req") val …
Run Code Online (Sandbox Code Playgroud) rx-java2 ×10
android ×7
retrofit2 ×3
rx-java ×3
java ×2
android-mvvm ×1
hystrix ×1
mvvm ×1
paging ×1
resilience4j ×1
rx-android ×1
rx-kotlin ×1