我有一个PublishSubject
和一个Subscriber
我用来处理(可能)无限的预处理数据流.问题是某些元素可能包含一些错误.我想忽略它们并继续处理.我怎么能这样做?我尝试过这样的事情:
val subject = PublishSubject.create<String>()
subject.retry().subscribe({
println("next: $it")
}, {
println("error")
}, {
println("complete")
})
subject.onNext("foo")
subject.onNext("bar")
subject.onError(RuntimeException())
subject.onNext("wom")
subject.onComplete()
Run Code Online (Sandbox Code Playgroud)
我的问题是没有任何错误处理方法可以帮助我:
onErrorResumeNext()
- 指示Observable在遇到错误时发出一系列项目
onErrorReturn(?)
- 指示Observable在遇到错误时发出特定项目
onExceptionResumeNext(?)
- 指示Observable在遇到异常后继续发出项目(但不是另一种可抛出的项目)
retry(?)
- 如果源Observable发出错误,请重新订阅它,希望它能完成而不会出错
retryWhen(?)
- 如果源Observable发出错误,请将该错误传递给另一个Observable以确定是否重新订阅源
我尝试retry()
了例子,但它无限期地在错误之后挂起我的进程.
我也试过,onErrorResumeNext()
但它没有按预期工作:
val backupSubject = PublishSubject.create<String>()
val subject = PublishSubject.create<String>()
var currentSubject = subject
subject.onErrorResumeNext(backupSubject).subscribe({
println("next: $it")
}, {
println("error")
currentSubject = backupSubject
}, {
println("complete")
})
backupSubject.subscribe({
println("backup")
}, {
println("backup error")
})
currentSubject.onNext("foo")
currentSubject.onNext("bar") …
Run Code Online (Sandbox Code Playgroud) 我的用例是想在我的onNext中的某个条件之后处理.所以尝试使用DisposableObserver.这是有效的代码
Observable.just(1, 2, 3, 4)
.subscribe(new DisposableObserver<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("onNext() received: " + integer);
if (integer == 2) {
dispose();
}
}
@Override
public void onError(Throwable e) { System.out.println("onError()"); }
@Override
public void onComplete() { System.out.println("onComplete()"); }
}
);
Run Code Online (Sandbox Code Playgroud)
现在,如果你尝试用lambda替换它,它会将lambda视为
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)
Run Code Online (Sandbox Code Playgroud)
现在这样做.通过从onSubscribe保存一次性用品,然后调用disposable.dispose(); 来自onNext.
private Disposable disposable;
private void disposableObserverTest() {
Observable.just(1, 2, 3, 4)
.subscribe(integer -> {
System.out.println("onNext() received: " + integer);
if (integer == 2) { …
Run Code Online (Sandbox Code Playgroud) 似乎RxJava中有两种错误:
onError
RxJavaPlugins.setErrorHandler
我很难理解为什么会这样.问题:
onError
?我正在努力寻找将两个Flowable压缩为一个的RxJava2示例。
我正在尝试修改此测试,使其包含以下内容:
Integer[] ints = new Integer[count];
Integer[] moreints = new Integer[count];
Arrays.fill(ints, 777);
Arrays.fill(moreints, 777);
Flowable<Integer> source = Flowable.fromArray(ints);
Flowable<Integer> anothersource = Flowable.fromArray(moreints);
Flowable<Integer> zippedsources = Flowable.zip(source, anothersource,
new BiFunction<Flowable<Integer>, Flowable<Integer>, Flowable<Integer>>() {
@Override
public void apply(Flowable<Integer> arg0, Flowable<Integer> arg1) throws Exception {
return arg0.blockingFirst() + arg1.blockingLast();
}
}).runOn(Schedulers.computation()).map(this).sequential();
Run Code Online (Sandbox Code Playgroud)
编辑:我正在尝试从源和另一个源中获取一个Integer并将它们加起来,但这似乎与RxJava1的实现方式根本不同...我尝试了一堆返回Integer,Publisher,Flowable和void的变体,但一直在获取zip运算符本身上的Eclipse中的错误。
我无法弄清楚到底在哪里 .zip(Iterable<? extends Publisher<? extends T>>, Function<? super Object[], ? extends R>).
我将可观察对象放在onPause()中,我想在onResume()中重新启动它。我怎样才能做到这一点?
这是我的观察结果:
Observable<ObdCommandResult> myObservable = Observable.create(new ObservableOnSubscribe<ObdCommandResult>() {
@Override
public void subscribe(ObservableEmitter<ObdCommandResult> e) throws Exception {
...
socket.connect();
new ObdResetCommand().run(socket.getInputStream(), socket.getOutputStream());
new EchoOffCommand().run(socket.getInputStream(), socket.getOutputStream());
new LineFeedOffCommand().run(socket.getInputStream(), socket.getOutputStream());
new SelectProtocolCommand(ObdProtocols.AUTO).run(socket.getInputStream(), socket.getOutputStream());
ObdCommandResult obdCommandResult = new ObdCommandResult();
while (!Thread.currentThread().isInterrupted()) {
for (int i = 1; i < 5; i++) {
try {
livedataObdCommandList.get(i-1).getCommand().run(socket.getInputStream(), socket.getOutputStream());
obdCommandResult.setId(i);
obdCommandResult.setValue(livedataObdCommandList.get(i-1).getCommand().getFormattedResult());
e.onNext(obdCommandResult);
}catch (UnsupportedCommandException uce) {
} catch (InterruptedException ie) {
}
}
}
});
Run Code Online (Sandbox Code Playgroud)
...以及我的观察者:
Observer<ObdCommandResult> observer = new Observer<ObdCommandResult>() {
@Override
public void onNext(ObdCommandResult value) …
Run Code Online (Sandbox Code Playgroud) 我希望集成retrofit
有RxJava
,但它无法解析符号“RxJavaCallAdapterFactory”。
import io.reactivex.Observable;
import retrofit2.http.Field;
import retrofit2.http.FormUrlEncoded;
import retrofit2.http.POST;
public interface RetrofitService {
@FormUrlEncoded
@POST("api/do.php")
Observable<String> getUserToken(@Field("action") String action,
@Field("name") String name,
@Field("password") String password);
}
Run Code Online (Sandbox Code Playgroud)
import android.leedev.cn.readertool.R;
import android.leedev.cn.readertool.model.http.service.RetrofitService;
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import retrofit2.Retrofit;
import retrofit2.converter.scalars.ScalarsConverterFactory;
public class MainActivity extends AppCompatActivity {
private static final String TAG = MainActivity.class.getSimpleName();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
login();
} …
Run Code Online (Sandbox Code Playgroud) 如何将Flowable转换为Single?或者,如果有另一种方法使其在第一个感兴趣的响应之后停止发光.
我试过这个,但它似乎不起作用:
disposables.add(
repository.getAllSomethings()
.subscribeOn(SchedulerProvider.getInstance().computation())
.observeOn(SchedulerProvider.getInstance().ui())
.toSingle()
.subscribeWith(object : DisposableSingleObserver<List<Something>>() {
override fun onSuccess(t: List<Something>) {
}
override fun onError(e: Throwable) {
}
})
Run Code Online (Sandbox Code Playgroud)
getAllSomethings()返回一个 Flowable<List<Something>>
在上面的代码.subscribeWith()
中用红色下划线表示:
Type parameter bound for E in
fun <E : SingleObserver<in Flowable<List<Something>!>!>!> subscribeWith(observer: E!): E!
is not satisfied: inferred type ! is not a subtype of SingleObserver<in Flowable<List<Something>!>!>!
Run Code Online (Sandbox Code Playgroud) 我有以下代码
sealed class AddressUiState
object AddressLoading : AddressUiState()
class AddressLoadedState(val addressResponse: AddressBookResponse) : AddressUiState()
class AddressErrorState(val error: Throwable) : AddressUiState()
Run Code Online (Sandbox Code Playgroud)
我有如下的ViewModel
class AddressViewModel constructor(private val service: SingleProfileService) : ViewModel() {
fun getDisplayableAddressState(id: String): Observable<out AddressUiState> {
return service.getAddresses(id)
.map { AddressLoadedState(it) }
.startWith(AddressLoading)
.onErrorReturn { AddressErrorState(it) }
.subscribeOn(Schedulers.io())
}
}
Run Code Online (Sandbox Code Playgroud)
我看到编译错误和错误类型不匹配的 onErrorReturn 。必填:AddressLoadedState!找到:AddressErrorState 上面的代码有什么问题?
我想知道将 APIKEY 放入所有 REST 请求中的最佳方法,而不必将其添加到请求的参数中。
目前我只接到了几个电话,但我正在尝试进一步了解。
@GET(".")
fun getSearch(@Query("s") text: String, @Query("apikey") APIKEY: String) : Observable<ResponseSearch>
Run Code Online (Sandbox Code Playgroud)
我想知道是否有办法不在每次调用的变量中包含 APIKEY
我想限制/test
API 调用每 3 秒调用一次,例如:
2021-09-21 14:09:19.920 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.031 V/OkHttp: <-- 200 https://xxx/test (109ms)
2021-09-21 14:09:20.038 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.136 V/OkHttp: <-- 200 https://xxx/test (96ms)
2021-09-21 14:09:20.146 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.315 V/OkHttp: <-- 200 https://xxx/test (168ms)
2021-09-21 14:09:20.325 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.499 V/OkHttp: <-- 200 https://xxx/test (172ms)
2021-09-21 14:09:20.514 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.615 V/OkHttp: <-- 200 https://xxx/test (100ms)
2021-09-21 14:09:20.628 V/OkHttp: --> GET https://xxx/test
2021-09-21 14:09:20.721 V/OkHttp: …
Run Code Online (Sandbox Code Playgroud)