我有一个名为verifyData的服务。我需要多次调用此服务。
服务
@FormUrlEncoded
@POST("verifyData")
Observable<Adeudos> Adeudos(
@Field("id") int id_user
);
Run Code Online (Sandbox Code Playgroud)
简单调用
Observable<Adeudos> respuesta = services.verifyData(1);
respuesta.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Adeudos>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Adeudos adeudos) {
}
});
Run Code Online (Sandbox Code Playgroud)
我需要使用这个数组来执行这个“方法”
List<String> ids = new ArrayList(); // 1,2,3,4,5,6,7
Run Code Online (Sandbox Code Playgroud)
解决方案
添加retrolambda在我的gradle产出
爪哇
Observable.from(ids)
.flatMap(s -> services.verifyData(ids).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Adeudos>() {
@Override
public void onCompleted() {
Log.e("Completed :"," Completed\n");
}
@Override
public void onError(Throwable e) …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用一个缓冲区,在rxjava中打开和克隆边界,但我无法让它工作.我想做的是假设一个可观察的发射,0,1,2,3,0,1,3,0,3我想以{0,1,2,3},{0,1结束,3},{0,3}.
这是我到目前为止的代码:
PublishSubject openning = PublishSubject.create();
openning.doOnNext(new Consumer() {
@Override
public void accept(@NonNull Object o) throws Exception {
if(o.equals("0"))
openning.onNext(o);
}
});
Observable<String> observableA = Observable.interval(1, TimeUnit.SECONDS).map(value -> String.valueOf(value % 10));
observableA.subscribe(openning);
// TODO: Buffer by boundary
observableA = observableA.buffer(openning, new Function<String, Observable<List<String>>>() {
@Override
public Observable<List<String>> apply(@NonNull String o) throws Exception {
list.add(o);
if (o.equals("0")) {
return Observable.just(list);
} else {
list.add(o);
sb.append(o);
return Observable.never();
}
}
}, new Callable() {
@Override
public Object call() throws Exception {
return …Run Code Online (Sandbox Code Playgroud) 我在互联网上看到了许多示例/教程,即使是在stackoverflow上使用Retrofit和RxJava从后端服务器获取数据,但我无法理解每种技术的目的是什么.
如果有人能解释我,我将不胜感激.谢谢!
我正在使用 WorkManager (Android Jetpack) 和 Rx 开发一个 Android 应用程序。下面是 Worker 类。
class ImageRxWorker(
appContext: Context,
private val workerParams: WorkerParameters
) : RxWorker(appContext, workerParams) {
override fun createWork(): Single<Result> = Single.create<Result> { emitter -
// do the job
emitter.onSuccess(Result.success())
}
}
Run Code Online (Sandbox Code Playgroud)
它工作正常,没有问题。但我想知道的是我该如何处理结果?
class MainPresenter(
private val view: MainActivity,
private val workManager: WorkManager = WorkManager.getInstance()
) : MainContract.Presenter {
override fun startWork(): Completable {
view.showToastMessage(R.string.worker_started)
return Completable.create { emitter ->
val uploadWorkRequest = OneTimeWorkRequestBuilder<ImageRxWorker>().build()
workManager.enqueue(uploadWorkRequest)
emitter.onComplete() // This is not exit immediately. …Run Code Online (Sandbox Code Playgroud) 声明PublishSubject在RxJava中不是线程安全的.好.
我试图找到任何一个例子,我试图构建任何一个例子来模拟竞争条件,这会导致不必要的结果.但我不能:(
任何人都可以提供一个证明PublishSubject不是线程安全的例子吗?
我正在使用Rxjava2,
我flatmap在RxJava中使用以下结构:
Observable1.flatmap() 并返回Observable 2.
如下代码:
getApi().createUser(os, deviceToken)
.compose(view.regisObserver())
.subscribeOn(Schedulers.io())
.flatMap(result -> {
String user_token = result.data.user_token;
getPreferenceStore().setAuthToken(user_token);
setReadPolicy();
Observable<ObjectDto<UserProfile>> obs = getApi().updateProfile(null, null, null);
obs.compose(view.regisObserver());
return obs;
})
.subscribe(result-> {
getPreferenceStore().setUserId(result.data.user_id);
// view.onUpdateProfile();
}, Throwable::printStackTrace);
@Override
public <T> ObservableTransformer<T, T> regisObserver() {
return observable -> observable.compose(prepare())
.doOnSubscribe(disposable -> showProgressDialog())
.doOnComplete(this::closeProgressDialog)
.doOnError(throwable -> {
if (BuildConfig.DEBUG) {
throwable.printStackTrace();
}
showProgressDialog();
closeProgressDialog();
});
}
Run Code Online (Sandbox Code Playgroud)
代码编译没有错误.它在运行时出错NetworkErrorOnMainThread.我不知道如何解决它.