标签: rx-java2

Retrofit2 使用 rxjava2 时出现未经验证的 401 错误

自从我集成 RxJava2 以来,我在返回Observable 的所有改造调用中收到 401 未经身份验证的错误,我正在使用基本身份验证,并且我知道错误是由于它造成的,但为什么它在调试时有效,但在发布时无效。

在我看来,retrofit2 的 rxjava 适配器的配置有问题

堆栈跟踪:

com.jakewharton.retrofit2.adapter.rxjava2.HttpException: HTTP 401 Unauthorized
01-22 19:24:14.872 11502-11502/? W/System.err:     at com.jakewharton.retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:54)
01-22 19:24:14.872 11502-11502/? W/System.err:     at com.jakewharton.retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:37)
01-22 19:24:14.872 11502-11502/? W/System.err:     at com.jakewharton.retrofit2.adapter.rxjava2.CallObservable.subscribeActual(CallObservable.java:43)
01-22 19:24:14.872 11502-11502/? W/System.err:     at io.reactivex.Observable.subscribe(Observable.java:10514)
01-22 19:24:14.872 11502-11502/? W/System.err:     at com.jakewharton.retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
01-22 19:24:14.872 11502-11502/? W/System.err:     at io.reactivex.Observable.subscribe(Observable.java:10514)
01-22 19:24:14.872 11502-11502/? W/System.err:     at io.reactivex.internal.operators.observable.ObservableDoOnLifecycle.subscribeActual(ObservableDoOnLifecycle.java:33)
01-22 19:24:14.872 11502-11502/? W/System.err:     at io.reactivex.Observable.subscribe(Observable.java:10514)
01-22 19:24:14.872 11502-11502/? W/System.err:     at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
01-22 19:24:14.872 11502-11502/? W/System.err:     at io.reactivex.Observable.subscribe(Observable.java:10514)
01-22 19:24:14.872 11502-11502/? …
Run Code Online (Sandbox Code Playgroud)

android basic-authentication retrofit2 rx-java2

6
推荐指数
1
解决办法
1万
查看次数

RxJava 2 相当于 isUnsubscribed

我一直在研究Reactive Programming with RxJava一书中的例子,它针对版本 1 而不是 2。无限流的介绍有以下示例(并注意有更好的方法来处理并发):

Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> {
    Runnabler = () -> {
        BigInteger i = ZERO;
        while (!subscriber.isUnsubscribed()) {
            subscriber.onNext(i);
            i = i.add(ONE);
        }
    };
    new Thread(r).start();
});

...

Subscription subscription = naturalNumbers.subscribe(x -> log(x));
/* after some time... */
subscription.unsubscribe();
Run Code Online (Sandbox Code Playgroud)

但是,在 RxJava 2 中,传递给create()方法的 lambda 表达式是类型的,ObservableEmitter并且它没有isUnsubscribed()方法。我查看了2.0 中的不同之处,还搜索了存储库,但找不到任何此类方法。

如何在 2.0 中实现相同的功能?

编辑以包含如下给出的解决方案(nb 使用 kotlin):

val naturalNumbers = Observable.create<BigInteger> { emitter ->
    Thread({
        var int: …
Run Code Online (Sandbox Code Playgroud)

rx-java rx-kotlin rx-java2

6
推荐指数
1
解决办法
922
查看次数

使用 RxJava 2.x 是否可以使用 Zip 超过 2 个 Observable?

我正在使用 RxJava 2.x,并且有 3 个 observables(如果重要的话,特别是发布主题)。

我喜欢将它们全部运行一次,并获得一次结果。我正在使用Observable.zip()运算符进行此类过程。但是看起来 Zip 运算符不支持超过 2 个 observables。

是否有其他运算符可以像 zip 一样组合 3 个以上的观察值?

Observable.zip(
        getData(),
        getOtherData(),
        getTemplate(),
        (o1,o2,o3)->{

        });
Run Code Online (Sandbox Code Playgroud)

android rx-java rx-java2

6
推荐指数
1
解决办法
6370
查看次数

如何在Android RxJava中使用flatMap在不使用lambda函数的情况下依次进行三个Web服务调用?

我的 API 客户端

public class ApiClient {
    public static final String BASE_URL = "http://baseurl.com/wp-json/";
    private static Retrofit retrofit = null;


    public static Retrofit getClient() {
        if (retrofit==null) {
            retrofit = new Retrofit.Builder()
                    .baseUrl(BASE_URL)
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                    .addConverterFactory(GsonConverterFactory.create())
                    .build();
        }
        return retrofit;
    }
}
Run Code Online (Sandbox Code Playgroud)

我的端点接口

    public interface ApiInterface {

        @GET("posts/category/politics/recent-stories/")
        Observable<ArrayList<ModelNewsPaged>> getPoliticsArrayListObservable(@Query("per_page") int perPage, @Query("page") int page);

        @GET("posts/category/economy/recent-stories/")
        Observable<ArrayList<ModelNewsPaged>> getEconomyArrayListObservable(@Query("per_page") int perPage, @Query("page") int page);

        @GET("posts/category/sports/recent-stories/")
        Observable<ArrayList<ModelNewsPaged>> getSportsArrayListObservable(@Query("per_page") int perPage, @Query("page") int page);
    }
Run Code Online (Sandbox Code Playgroud)

如何在不使用 lambda 函数的情况下使用 flatMap 依次调用 getPoliticsArrayListObservable 和 getEconomyArrayListObservable 和 …

android retrofit2 rx-java2

6
推荐指数
1
解决办法
1850
查看次数

rxjava 抛出 OnErrorNotImplementedException

Observable.create(new ObservableOnSubscribe())
    .subscribeOn(Schedulers.io())
    .compose(new ParserTransformer())
    .map()
    .subscribe()
Run Code Online (Sandbox Code Playgroud)

当我的 ObservableOnSubscribe 出现问题时,它会抛出异常(java.net.UnknownHostException ...)。

可观察订阅:

public void subscribe(ObservableEmitter oe) throws Exception {
    try{
        ...
    } catch {
        if (!oe.isDisposed()) {
            if (ex instanceof IOException) {
                throw new NetException(...); //sometimes, it make the app crash!
            }
        } else{
            ...
        }
    }
    oe.onComplete();
}
Run Code Online (Sandbox Code Playgroud)

这是日志:

D OkHttp: error: java.net.UnknownHostException
D OkHttp: oe.isDisposed: false
W System.err: io.reactivex.exceptions.OnErrorNotImplementedException: Unable to resolve host ******: No address associated with hostname
W System.err:   at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
W System.err:   at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
W …
Run Code Online (Sandbox Code Playgroud)

android rx-java2

6
推荐指数
2
解决办法
7087
查看次数

RxJava:当其中一个流不发出任何信号时如何处理combineLatest()

我使用 mergeLatest() 来组合 3 个可观察数据流。所有这些组合在一起,以便同时显示 UI 中的所有数据。现在,有一种情况,其中一个可观察量不会发出任何内容,因为获取的数据可能为空。

是否有 RxJava 运算符让订阅者知道不会因为空数据而发出任何信号?

编辑

private fun retrieveData() {
    Observable.combineLatest(getCurrentUser.execute(), getLatestGoal.execute(), getLatestLog.execute(),
            Function3<User, Goal, Log, PersonalViewModel> { user, goal, log -> mapToViewModel(user, goal, log) })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnSubscribe { /*todo: animation*/ }
            .doOnNext { view.setViewModel(it) }
            .doOnComplete { view.stopLoading() }
            .doOnError { /*todo: error message*/ }
            .subscribe()
}
Run Code Online (Sandbox Code Playgroud)

第三个流:当用户有 nog 日志时,getLatestLog.execute() 不发出任何内容。当该流不发出时,整个视图将不可见。

数据是从 FireBase 实时数据库中获取的。ChildEventListener 有一个如下所示的方法:

override fun onChildAdded(dataSnapshot: DataSnapshot?, p1: String?) {
                val log = dataSnapshot?.getValue(Log::class.java)
                log?.let { subscriber.onNext(it) }
                subscriber.onComplete()
                firebaseDatabase.reference.removeEventListener(this)
            }
Run Code Online (Sandbox Code Playgroud)

java android rx-java rx-android rx-java2

6
推荐指数
1
解决办法
5605
查看次数

在 MVVM 架构中处理来自 api 端点的错误的最佳实践是什么?

我的目标(也是问题)是做,比如说,集中的错误处理。在大多数情况下,每个 API 端点的错误将以相同的方式处理,所以我不想有重复或大量if else语句。

我的应用程序的架构对应于developer.android.com 中描述的架构

在此处输入图片说明

所以,这意味着我应该将错误从repovia传递viewModelUI layer (Activity/Fragment),以便从该层进行 UI 更改。

我的代码中的一些小部分:

myService.initiateLogin("Basic " + base64, authBody)
                .enqueue(new Callback<UserTokenModel>() {
                    @Override
                    public void onResponse(Call<UserTokenModel> call, Response<UserTokenModel> response) {
                        userTokenModelMutableLiveData.setValue(response.body());
                    }

                    @Override
                    public void onFailure(Call<UserTokenModel> call, Throwable t) {
                        // TODO better error handling in feature ...
                        userTokenModelMutableLiveData.setValue(null);
                    }
                });
Run Code Online (Sandbox Code Playgroud)

比方说,我们需要表现出吐司每一个onFailure(...)方法调用时或errorBody不会nullonResponse(...)每一个API调用方法。

那么,在保持架构不变的同时进行“集中式”错误处理的建议是什么?

error-handling android mvvm retrofit2 rx-java2

6
推荐指数
1
解决办法
2292
查看次数

带有 MVVM 的 RxJava:带有 setValue() 的 MutableLiveData 与 LiveDataReactiveStreams

Let's say you have a MVVM app with a UI layer, a ViewModel, and a Repository. Say that in your repository, you're getting some data from an API with Single Retrofit calls, and transforming it into a UI-ready viewstate object.

The way I see it, you have two main choices (assuming you want to use LiveData in the UI layer, I am not including the option of observing Rx types from the UI):

  1. Expose your Rx Observable from the repository, …

android android-mvvm rx-java2 android-livedata

6
推荐指数
0
解决办法
423
查看次数

将 HystrixCommands 迁移到 Resilience4j

鉴于 Hystrix 进入维护模式,我一直致力于将(相当大的)代码库迁移到 Resilience4j。

我在 Hystrix 中大量使用以下模式:

new HystrixCommand<SomeReturnValue>(DependencyKeys.DEPENDENCY) {
    @Override
    protected SomeReturnValue run() {
        return someExpensiveCall();
    }
}
    .observe()
Run Code Online (Sandbox Code Playgroud)

我想用 Resilience4j 复制 Hystrix 的一些功能。

到目前为止,我有以下语法来连接外部调用:

resilience.single(DependencyKeys.DEPENDENCY, this::someExpensiveCall);
Run Code Online (Sandbox Code Playgroud)

其中Resilience类提供single方法:

public <T> Single<T> single(ResilienceKey key, Callable<T> callable) {
    return Completable.complete()
            .subscribeOn(Schedulers.computation())
            .observeOn(configuration.scheduler(key))
            .andThen(Single.defer(() -> Single.fromCallable(callable)
                    .lift(CircuitBreakerOperator.of(configuration.circuitBreaker(key)))
                    .lift(RateLimiterOperator.of(configuration.rateLimiter(key)))
                    .lift(BulkheadOperator.of(configuration.bulkhead(key)))
            ))
            .observeOn(Schedulers.computation());
}
Run Code Online (Sandbox Code Playgroud)

在断路和在不同线程池上运行代码方面,这看起来如何更好地类似于 Hystrix,但以更理智的方式。我真的不喜欢用 just 来启动链,这样我就可以在实际的可调用对象被包装之前Completable.complete()强制使用 a 。observeOn

java reactive-programming hystrix rx-java2 resilience4j

6
推荐指数
1
解决办法
3543
查看次数

相同的值作为 nextKey 在从 Paging Library 3 Android 中的 PagingSource 加载的两个连续页面中传递

我从分页 2 迁移到分页 3。我尝试将分页 2 的 ItemKeyedDataSource 实现到分页库 3。但我面临的问题是,在加载的两个连续页面中,相同的值(currentJodId)作为 nextkey 传递。在该应用程序崩溃之后。但是如果我在数据源中添加“keyReuseSupported = true”,应用程序不会崩溃。但它开始调用与 nextkey 相同的项目 ID。

JobSliderRestApi.kt

@GET("job/list/slides")
fun getDetailOfSelectedJob(
    @Query("current_job") currentJodId: Int?,
    @Query("limit") jobLimit: Int?,
    @Query("search_in") fetchType: String?
): Single<Response<JobViewResponse>>
Run Code Online (Sandbox Code Playgroud)

JobViewResponse.kt

data class JobViewResponse(
    @SerializedName("data") val data: ArrayList<JobDetail>?
) : BaseResponse()
Run Code Online (Sandbox Code Playgroud)

JobDetail.kt

data class JobDetail(
    @SerializedName("job_id") val jobId: Int,
    @SerializedName("tuition_type") val jobType: String?,
    @SerializedName("class_image") val jobImage: String,
    @SerializedName("salary") val salary: String,
    @SerializedName("no_of_student") val noOfStudent: Int,
    @SerializedName("student_gender") val studentGender: String,
    @SerializedName("tutor_gender") val preferredTutor: String,
    @SerializedName("days_per_week") val daysPerWeek: String?,
    @SerializedName("other_req") val …
Run Code Online (Sandbox Code Playgroud)

paging android-studio rx-java2

6
推荐指数
1
解决办法
1095
查看次数