我有一个包含一组对象的arraylist。我想使用rxjava,这样我就可以通过onSubscribe方法循环遍历列表,而不是一次获取整个列表,而是一次获取每个列表项
我有一个情况,一个长时间运行的进程被包装在一个Observable.fromCallable().这个过程是一个OkHttp调用,如果终止,将抛出一个IOException.如果订阅了observable,则将一次性存储在a中CompositeDisposable,并按预期处理异常.但是,CompositeDisposable在某些情况下,我的代码将清除,在OkHttp没有错误处理的情况下触发线程终止,导致应用程序因未处理的异常而崩溃.这是这个问题的简单单元测试示例:
@Test
public void test(){
CompositeDisposable compositeDisposable = new CompositeDisposable();
Observable<Object> o = Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
System.out.println("sleeping - this sleep will be interrupted when compositeDisposable gets cleared");
Thread.sleep(3000);
return null;
}
});
compositeDisposable.add(o.subscribeOn(new IoScheduler()).subscribe());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
compositeDisposable.clear();
}
Run Code Online (Sandbox Code Playgroud)
有没有办法解决这个问题?
为什么可能.toSingle()抛出错误而没有这样的元素?我试图处理doOnError,但不起作用!
Single<Integer> singleOdd = Single.just(1);
Single<Integer> singleEven = Single.just(2);
Single.concat(singleOdd.filter(integer -> integer%2 ==0).toSingle(),singleEven).doOnError(throwable -> throwable.printStackTrace()).subscribe();
Run Code Online (Sandbox Code Playgroud) 我有一个使用RxJava 2的简单应用程序:
public static void main(final String[] args) {
final Scheduler scheduler = Schedulers.from(Executors.newCachedThreadPool());
final Observable<String> ticker = Observable.interval(1L, TimeUnit.SECONDS)
.take(10)
.subscribeOn(scheduler)
.map(x -> x + "s");
ticker.subscribe(x -> {
System.out.println(x);
});
}
Run Code Online (Sandbox Code Playgroud)
它正确打印定时器10次:
0s
1s
2s
3s
4s
5s
6s
7s
8s
9s
Run Code Online (Sandbox Code Playgroud)
但是,应用程序之后不会终止9s.似乎有一些线程让它保持活力.
我应该如何实现这一点,以便应用程序在ticker完成后终止?
我想在用户点击按钮时调用请求.但是当第一次请求没有响应时,用户想要更改参数并使用相同的URL调用新请求.
问题 如何取消第一个请求并使用相同的URL调用新请求.
ServiceFactory
.createService()
.getSomething(page)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DefaultObserver<SomthingResponse>() {
@Override
public void onNext(@NonNull SomthingResponse response) {
showSomthing()
hideProgressDialog();
}
@Override
public void onError(@NonNull Throwable e) {
hideProgressDialog();
showErrorMessage(e.getMessage());
}
@Override
public void onComplete() {
}
});
Run Code Online (Sandbox Code Playgroud)
当用户点击按钮时,用户会将页面参数发送到查询字符串,但此请求尚未完成.用户将更改为页面参数并发送新请求.我想取消第一个请求.
谢谢
我的Android项目中存在以下问题:
我有一个使用Retrofit从上一个网络调用获得的元素列表(ArrayList)。CategoryItem看起来像这样:
public class CategoryItem {
String id;
String name;
public CategoryItem(String id, String name) {
this.id = id;
this.name = name;
}
public String getId() {
return id;
}
public String getName() {
return name;
}
}
Run Code Online (Sandbox Code Playgroud)
实际上,类别由20个元素组成。现在,我需要制作一个网络API,并获取每个类别的产品列表。为此,产品API将类别中的ID作为查询参数。在获得某种类别的产品列表之后,我将其添加到内部SQLite DB中。
我想使用RxJava(1和2)做到这一点。
我到目前为止所做的不是多线程的,而是在MainThread上的,这是不正确的。我将在此处添加代码段:
for (int i = 0; i < categoryItems.size(); i++) {
CategoryItem categoryItem = categoryItems.get(i);
final String iCat = categoryItem.getId();
Observable<ProductResponse> call = networkManager.getApiServiceProductsRx().getProductsRx(iCat);
Subscription subscription = call
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<ProductResponse>() {
@Override
public void onCompleted() {
} …Run Code Online (Sandbox Code Playgroud) 有没有办法在运行时更改Observable.interval周期?有没有一种方法可以停止和恢复Observable.interval节拍?有没有办法重置间隔时间?
实际上,我正在使用以下代码在一段时间内永远执行某项操作,但是在运行期间我无法对其进行控制,因此必须在运行时停止,恢复,休息和更改周期。
Observable.interval(8, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("TAG", "onSubscribe");
}
@Override
public void onNext(Long aLong) {
myMethod();
}
@Override
public void onError(Throwable e) {
Log.i("TAG", "onError");
}
@Override
public void onComplete() {
Log.i("TAG", "onComplete");
}
});
Run Code Online (Sandbox Code Playgroud)
我尝试用Google搜索它以找到一种解决方案,但不幸的是我没有找到任何解决方案,如果有的话,我需要帮助或资源。
我对RxJava非常陌生,我正在尝试使用RxJava进行Retrofit调用。当我在SubscribeOn上编写此代码时,它说“无法解析方法SubscribeOn(io.reactivex.scheduler)”。
你能指导我做错了什么吗?
谢谢R
Presenter层中的getDemoData。
void getDemoData(){
mCompositeDisposable.add(apiInterface.getDemoData()
//subscribeOn has the error.
.subscribeOn(Schedulers.io()) // "work" on io thread
.observeOn(AndroidSchedulers.mainThread()) // "listen" on UIThread
.map(new Function<IApiCalls, List<DemoJSONAPIData>>() {
@Override
public List<DemoJSONAPIData> apply(
@io.reactivex.annotations.NonNull final IApiCalls apiCalls)
throws Exception {
// we want to have the geonames and not the wrapper object
return apiCalls.getDemoData();
}
})
.subscribe(new Consumer<List<Geoname>>() {
@Override
public void accept(
@io.reactivex.annotations.NonNull final List<Geoname> geonames)
throws Exception {
//display
}
})
);
}
Run Code Online (Sandbox Code Playgroud)
Api接口
public interface ApiInterface {
@GET("/posts")
//Single<DemoJSONAPIData> getDemoData(); …Run Code Online (Sandbox Code Playgroud) 所以我想用rx-java2进行表单验证.我正在使用Kotlin.我遇到了两个问题.emailObservable和passwordObservable都是类型Disposable!.我尝试通过调用指定类型,val emailObservable: Observable<Boolean>但Android Studio认为它Disposable!.
其次,当我想使用方法时combineLatest出现错误:使用提供的参数不能调用以下任何函数.
emailObservable和passwordObservable都能正常工作.我是rx-java的新手,我对这种类型的东西很困惑.
val emailObservable = RxTextView.afterTextChangeEvents(textEmail)
.observeOn(AndroidSchedulers.mainThread())
.map { x -> textEmail.text.length > 3 }
.subscribe { x -> foo(x) }
val passwordObservable =RxTextView.afterTextChangeEvents(textPassword)
.observeOn(AndroidSchedulers.mainThread())
.map { x -> textPassword.text.length > 5 }
.subscribe { x -> foo(x) }
Observable.combineLatest(emailObservable,
passwordObservable,
BiFunction { x: Boolean, y:Boolean -> x && y })
Run Code Online (Sandbox Code Playgroud) I'm trying to loop a list but i'm not getting the index of current item.
Observable.fromIterable(yourList).observeOn(Schedulers.io())
.observeOn(Schedulers.io()).subscribe(
{ item -> { }},
{_ ->{}},
{->{}}
Run Code Online (Sandbox Code Playgroud)
Is there any way to get index just like
yourList.forEachIndexed{ index, item -> }
Run Code Online (Sandbox Code Playgroud)
I already know that
class Indexed {
int index;
String element;
Indexed(int index, String element) {
this.index = index;
this.element = element;
}
}
Run Code Online (Sandbox Code Playgroud)
this can be used as a solution. But i don't like this kind of approach. I want …