标签: rx-java2

如何在RxJava 2中发生错误后继续处理?

我有一个PublishSubject和一个Subscriber我用来处理(可能)无限的预处理数据流.问题是某些元素可能包含一些错误.我想忽略它们并继续处理.我怎么能这样做?我尝试过这样的事情:

    val subject = PublishSubject.create<String>()
    subject.retry().subscribe({
        println("next: $it")
    }, {
        println("error")
    }, {
        println("complete")
    })

    subject.onNext("foo")
    subject.onNext("bar")
    subject.onError(RuntimeException())
    subject.onNext("wom")
    subject.onComplete()
Run Code Online (Sandbox Code Playgroud)

我的问题是没有任何错误处理方法可以帮助我:

onErrorResumeNext() - 指示Observable在遇到错误时发出一系列项目

onErrorReturn(?) - 指示Observable在遇到错误时发出特定项目

onExceptionResumeNext(?) - 指示Observable在遇到异常后继续发出项目(但不是另一种可抛出的项目)

retry(?) - 如果源Observable发出错误,请重新订阅它,希望它能完成而不会出错

retryWhen(?) - 如果源Observable发出错误,请将该错误传递给另一个Observable以确定是否重新订阅源

我尝试retry()了例子,但它无限期地在错误之后挂起我的进程.

我也试过,onErrorResumeNext()但它没有按预期工作:

    val backupSubject = PublishSubject.create<String>()
    val subject = PublishSubject.create<String>()
    var currentSubject = subject
    subject.onErrorResumeNext(backupSubject).subscribe({
        println("next: $it")
    }, {
        println("error")
        currentSubject = backupSubject
    }, {
        println("complete")
    })

    backupSubject.subscribe({
        println("backup")
    }, {
        println("backup error")
    })

    currentSubject.onNext("foo")
    currentSubject.onNext("bar") …
Run Code Online (Sandbox Code Playgroud)

java kotlin rx-java2

5
推荐指数
1
解决办法
2522
查看次数

如何在RxJava2中使用带有lambda表达式的DisposableObserver

我的用例是想在我的onNext中的某个条件之后处理.所以尝试使用DisposableObserver.这是有效的代码

Observable.just(1, 2, 3, 4)
    .subscribe(new DisposableObserver<Integer>() {
                     @Override
                     public void onNext(Integer integer) {
                       System.out.println("onNext() received: " + integer);
                       if (integer == 2) {
                         dispose();
                       }
                     }
                     @Override
                     public void onError(Throwable e) { System.out.println("onError()"); }
                     @Override
                     public void onComplete() { System.out.println("onComplete()"); }
                   }
    );
Run Code Online (Sandbox Code Playgroud)

现在,如果你尝试用lambda替换它,它会将lambda视为

subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)
Run Code Online (Sandbox Code Playgroud)

现在这样做.通过从onSubscribe保存一次性用品,然后调用disposable.dispose(); 来自onNext.

  private Disposable disposable;
  private void disposableObserverTest() {
    Observable.just(1, 2, 3, 4)
        .subscribe(integer -> {
              System.out.println("onNext() received: " + integer);
              if (integer == 2) { …
Run Code Online (Sandbox Code Playgroud)

lambda android rx-java rx-android rx-java2

5
推荐指数
1
解决办法
1348
查看次数

在RxJava中,RxJavaPlugins.setErrorHandler和Subscribe onError有什么区别?

似乎RxJava中有两种错误:

  1. 订阅者捕获的错误 onError
  2. 并且由设置的处理程序全局捕获错误 RxJavaPlugins.setErrorHandler

我很难理解为什么会这样.问题:

  • 有两个错误处理程序背后的理由是什么?
  • 导致错误发送到一个处理程序与另一个处理程序的原因是什么?
  • 如何确保仅将错误发送给onError

java rx-java rx-java2

5
推荐指数
1
解决办法
1212
查看次数

RxJava2将两个Flowable压缩为一个

我正在努力寻找将两个Flowable压缩为一个的RxJava2示例。

我正在尝试修改此测试,使其包含以下内容:

    Integer[] ints = new Integer[count];
    Integer[] moreints = new Integer[count];
    Arrays.fill(ints, 777);
    Arrays.fill(moreints, 777);

    Flowable<Integer> source = Flowable.fromArray(ints);
    Flowable<Integer> anothersource = Flowable.fromArray(moreints);

    Flowable<Integer> zippedsources = Flowable.zip(source, anothersource,
            new BiFunction<Flowable<Integer>, Flowable<Integer>, Flowable<Integer>>() {

                @Override
                public void apply(Flowable<Integer> arg0, Flowable<Integer> arg1) throws Exception {
                    return arg0.blockingFirst() + arg1.blockingLast();
                }

    }).runOn(Schedulers.computation()).map(this).sequential();
Run Code Online (Sandbox Code Playgroud)

编辑:我正在尝试从源和另一个源中获取一个Integer并将它们加起来,但这似乎与RxJava1的实现方式根本不同...我尝试了一堆返回Integer,Publisher,Flowable和void的变体,但一直在获取zip运算符本身上的Eclipse中的错误。

我无法弄清楚到底在哪里 .zip(Iterable<? extends Publisher<? extends T>>, Function<? super Object[], ? extends R>).

java reactivex rx-java2

5
推荐指数
1
解决办法
3385
查看次数

在RxJava 2中配置后,如何重新启动发射?

我将可观察对象放在onPause()中,我想在onResume()中重新启动它。我怎样才能做到这一点?

这是我的观察结果:

    Observable<ObdCommandResult> myObservable = Observable.create(new ObservableOnSubscribe<ObdCommandResult>() {
        @Override
        public void subscribe(ObservableEmitter<ObdCommandResult> e) throws Exception {
            ...
            socket.connect();

            new ObdResetCommand().run(socket.getInputStream(), socket.getOutputStream());
            new EchoOffCommand().run(socket.getInputStream(), socket.getOutputStream());
            new LineFeedOffCommand().run(socket.getInputStream(), socket.getOutputStream());
            new SelectProtocolCommand(ObdProtocols.AUTO).run(socket.getInputStream(), socket.getOutputStream());
            ObdCommandResult obdCommandResult = new ObdCommandResult();
            while (!Thread.currentThread().isInterrupted()) {
                for (int i = 1; i < 5; i++) {
                    try {
                        livedataObdCommandList.get(i-1).getCommand().run(socket.getInputStream(), socket.getOutputStream());
                        obdCommandResult.setId(i);
                        obdCommandResult.setValue(livedataObdCommandList.get(i-1).getCommand().getFormattedResult());
                        e.onNext(obdCommandResult);
                    }catch (UnsupportedCommandException uce) {
                    } catch (InterruptedException ie) {
                }
            }
        }
    });
Run Code Online (Sandbox Code Playgroud)

...以及我的观察者:

    Observer<ObdCommandResult> observer = new Observer<ObdCommandResult>() {
        @Override
        public void onNext(ObdCommandResult value) …
Run Code Online (Sandbox Code Playgroud)

android dispose rx-android rx-java2

5
推荐指数
1
解决办法
282
查看次数

无法解析符号“ RxJavaCallAdapterFactory”

我希望集成retrofitRxJava,但它无法解析符号“RxJavaCallAdapterFactory”。

我的代码

我的依赖

这是我的代码

RetrofitService.java

import io.reactivex.Observable;
import retrofit2.http.Field;
import retrofit2.http.FormUrlEncoded;
import retrofit2.http.POST;

public interface RetrofitService {

    @FormUrlEncoded
    @POST("api/do.php")
    Observable<String> getUserToken(@Field("action") String action,
                                    @Field("name") String name,
                                    @Field("password") String password);
}
Run Code Online (Sandbox Code Playgroud)

MainActivity.java

import android.leedev.cn.readertool.R;
import android.leedev.cn.readertool.model.http.service.RetrofitService;
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import retrofit2.Retrofit;
import retrofit2.converter.scalars.ScalarsConverterFactory;

public class MainActivity extends AppCompatActivity {

    private static final String TAG = MainActivity.class.getSimpleName();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        login();
    } …
Run Code Online (Sandbox Code Playgroud)

java android rx-java retrofit2 rx-java2

5
推荐指数
1
解决办法
2864
查看次数

RxJava2将Flowable转换为Single

如何将Flowable转换为Single?或者,如果有另一种方法使其在第一个感兴趣的响应之后停止发光.

我试过这个,但它似乎不起作用:

  disposables.add(
        repository.getAllSomethings()
           .subscribeOn(SchedulerProvider.getInstance().computation())
           .observeOn(SchedulerProvider.getInstance().ui())
           .toSingle()
           .subscribeWith(object : DisposableSingleObserver<List<Something>>() {
                override fun onSuccess(t: List<Something>) {
                }

                override fun onError(e: Throwable) {
                }
            })
Run Code Online (Sandbox Code Playgroud)

getAllSomethings()返回一个 Flowable<List<Something>>

在上面的代码.subscribeWith()中用红色下划线表示:

Type parameter bound for E in 
fun <E : SingleObserver<in Flowable<List<Something>!>!>!> subscribeWith(observer: E!): E!
is not satisfied: inferred type  ! is not a subtype of SingleObserver<in Flowable<List<Something>!>!>!
Run Code Online (Sandbox Code Playgroud)

android rx-java2

5
推荐指数
1
解决办法
6832
查看次数

在Kotlin中将密封类与rxjava一起使用时获取类型不匹配

我有以下代码

sealed class AddressUiState
object AddressLoading : AddressUiState()
class AddressLoadedState(val addressResponse: AddressBookResponse) : AddressUiState()
class AddressErrorState(val error: Throwable) : AddressUiState()
Run Code Online (Sandbox Code Playgroud)

我有如下的ViewModel

class AddressViewModel constructor(private val service: SingleProfileService) : ViewModel() {

    fun getDisplayableAddressState(id: String): Observable<out AddressUiState> {
        return service.getAddresses(id)
                .map { AddressLoadedState(it) }
                .startWith(AddressLoading)
                .onErrorReturn { AddressErrorState(it) }
                .subscribeOn(Schedulers.io())

    }
}
Run Code Online (Sandbox Code Playgroud)

我看到编译错误和错误类型不匹配的 onErrorReturn 必填:AddressLoadedState!找到:AddressErrorState 上面的代码有什么问题?

kotlin rx-java rx-java2

5
推荐指数
1
解决办法
545
查看次数

我是否应该在使用 Retrofit 执行的每个请求中传递我的 APIKEY?

我想知道将 APIKEY 放入所有 REST 请求中的最佳方法,而不必将其添加到请求的参数中。

目前我只接到了几个电话,但我正在尝试进一步了解。

@GET(".")
fun getSearch(@Query("s") text: String, @Query("apikey") APIKEY: String) : Observable<ResponseSearch>
Run Code Online (Sandbox Code Playgroud)

我想知道是否有办法不在每次调用的变量中包含 APIKEY

api-key retrofit2 rx-java2

5
推荐指数
1
解决办法
2807
查看次数

Android 将特定 API 调用限制为 3 秒内一次

我想限制/testAPI 调用每 3 秒调用一次,例如:

2021-09-21 14:09:19.920 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.031 V/OkHttp: <-- 200 https://xxx/test (109ms)
2021-09-21 14:09:20.038 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.136 V/OkHttp: <-- 200 https://xxx/test (96ms)
2021-09-21 14:09:20.146 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.315 V/OkHttp: <-- 200 https://xxx/test (168ms)
2021-09-21 14:09:20.325 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.499 V/OkHttp: <-- 200 https://xxx/test (172ms)
2021-09-21 14:09:20.514 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.615 V/OkHttp: <-- 200 https://xxx/test (100ms)
2021-09-21 14:09:20.628 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.721 V/OkHttp: …
Run Code Online (Sandbox Code Playgroud)

android retrofit2 rx-java2

5
推荐指数
1
解决办法
477
查看次数