标签: rx-java2

使用rxjava2遍历arraylist

我有一个包含一组对象的arraylist。我想使用rxjava,这样我就可以通过onSubscribe方法循环遍历列表,而不是一次获取整个列表,而是一次获取每个列表项

java android arraylist rx-java rx-java2

0
推荐指数
1
解决办法
1425
查看次数

订阅被清除时,处理Observable.fromCallable()中的异常

我有一个情况,一个长时间运行的进程被包装在一个Observable.fromCallable().这个过程是一个OkHttp调用,如果终止,将抛出一个IOException.如果订阅了observable,则将一次性存储在a中CompositeDisposable,并按预期处理异常.但是,CompositeDisposable在某些情况下,我的代码将清除,在OkHttp没有错误处理的情况下触发线程终止,导致应用程序因未处理的异常而崩溃.这是这个问题的简单单元测试示例:

@Test
public void test(){
    CompositeDisposable compositeDisposable = new CompositeDisposable();
    Observable<Object> o = Observable.fromCallable(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            System.out.println("sleeping - this sleep will be interrupted when compositeDisposable gets cleared");
            Thread.sleep(3000);
            return null;
        }
    });
    compositeDisposable.add(o.subscribeOn(new IoScheduler()).subscribe());
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    compositeDisposable.clear();
}
Run Code Online (Sandbox Code Playgroud)

有没有办法解决这个问题?

android rx-java rx-java2

0
推荐指数
1
解决办法
1507
查看次数

Rxjava 2 Maybe.toSingle错误

为什么可能.toSingle()抛出错误而没有这样的元素?我试图处理doOnError,但不起作用!

     Single<Integer> singleOdd = Single.just(1);
    Single<Integer> singleEven = Single.just(2);
      Single.concat(singleOdd.filter(integer -> integer%2 ==0).toSingle(),singleEven).doOnError(throwable -> throwable.printStackTrace()).subscribe();
Run Code Online (Sandbox Code Playgroud)

java rx-java2

0
推荐指数
1
解决办法
1405
查看次数

为什么我的RxJava计时器应用程序没有终止?

我有一个使用RxJava 2的简单应用程序:

public static void main(final String[] args) {

    final Scheduler scheduler = Schedulers.from(Executors.newCachedThreadPool());

    final Observable<String> ticker = Observable.interval(1L, TimeUnit.SECONDS)
        .take(10)
        .subscribeOn(scheduler)
        .map(x -> x + "s");

    ticker.subscribe(x -> {
        System.out.println(x);
    });
}
Run Code Online (Sandbox Code Playgroud)

它正确打印定时器10次:

0s
1s
2s
3s
4s
5s
6s
7s
8s
9s
Run Code Online (Sandbox Code Playgroud)

但是,应用程序之后不会终止9s.似乎有一些线程让它保持活力.

我应该如何实现这一点,以便应用程序在ticker完成后终止?

java multithreading rx-java rx-java2

0
推荐指数
1
解决办法
168
查看次数

当我请求相同的url时,如何在retrofix2.0(rxandroid)中取消请求

我想在用户点击按钮时调用请求.但是当第一次请求没有响应时,用户想要更改参数并使用相同的URL调用新请求.

问题 如何取消第一个请求并使用相同的URL调用新请求.

 ServiceFactory
            .createService()
            .getSomething(page)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new DefaultObserver<SomthingResponse>() {
                @Override
                public void onNext(@NonNull SomthingResponse response) {
                    showSomthing()
                    hideProgressDialog();

                }

                @Override
                public void onError(@NonNull Throwable e) {
                    hideProgressDialog();
                    showErrorMessage(e.getMessage());
                }

                @Override
                public void onComplete() {

                }
            });
Run Code Online (Sandbox Code Playgroud)

当用户点击按钮时,用户会将页面参数发送到查询字符串,但此请求尚未完成.用户将更改为页面参数并发送新请求.我想取消第一个请求.

谢谢

android rx-java rx-android retrofit2 rx-java2

0
推荐指数
1
解决办法
403
查看次数

列表中的每个项目的RxJava在Android中使用Retofit发出api请求

我的Android项目中存在以下问题:

我有一个使用Retrofit从上一个网络调用获得的元素列表(ArrayList)。CategoryItem看起来像这样:

public class CategoryItem {
   String id;
   String name;
   public CategoryItem(String id, String name) {
     this.id = id;
     this.name = name;
   }
   public String getId() {
     return id;
   }
   public String getName() {
     return name;
   }
}
Run Code Online (Sandbox Code Playgroud)

实际上,类别由20个元素组成。现在,我需要制作一个网络API,并获取每个类别的产品列表。为此,产品API将类别中的ID作为查询参数。在获得某种类别的产品列表之后,我将其添加到内部SQLite DB中。

我想使用RxJava(1和2)做到这一点。

我到目前为止所做的不是多线程的,而是在MainThread上的,这是不正确的。我将在此处添加代码段:

for (int i = 0; i < categoryItems.size(); i++) {
    CategoryItem categoryItem = categoryItems.get(i);
    final String iCat = categoryItem.getId();
    Observable<ProductResponse> call = networkManager.getApiServiceProductsRx().getProductsRx(iCat);

    Subscription subscription = call
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<ProductResponse>() {
                @Override
                public void onCompleted() {
                } …
Run Code Online (Sandbox Code Playgroud)

android rx-java rx-java2

0
推荐指数
1
解决办法
4253
查看次数

如何更改Observable.interval的周期,以及如何在运行时停止和恢复Observable滴答

有没有办法在运行时更改Observable.interval周期?有没有一种方法可以停止和恢复Observable.interval节拍?有没有办法重置间隔时间?

实际上,我正在使用以下代码在一段时间内永远执行某项操作,但是在运行期间我无法对其进行控制,因此必须在运行时停止,恢复,休息和更改周期。

Observable.interval(8, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.i("TAG", "onSubscribe");
                        }

                        @Override
                        public void onNext(Long aLong) {
                            myMethod();
                        }

                        @Override
                        public void onError(Throwable e) {
                            Log.i("TAG", "onError");
                        }

                        @Override
                        public void onComplete() {
                            Log.i("TAG", "onComplete");
                        }
                    });
Run Code Online (Sandbox Code Playgroud)

我尝试用Google搜索它以找到一种解决方案,但不幸的是我没有找到任何解决方案,如果有的话,我需要帮助或资源。

android rx-android rx-java2

0
推荐指数
1
解决办法
633
查看次数

无法解析方法SubscribeOn-RxJava

我对RxJava非常陌生,我正在尝试使用RxJava进行Retrofit调用。当我在SubscribeOn上编写此代码时,它说“无法解析方法SubscribeOn(io.reactivex.scheduler)”。

你能指导我做错了什么吗?

谢谢R

Presenter层中的getDemoData。

  void getDemoData(){
        mCompositeDisposable.add(apiInterface.getDemoData()
                //subscribeOn has the error.
                .subscribeOn(Schedulers.io()) // "work" on io thread
                .observeOn(AndroidSchedulers.mainThread()) // "listen" on UIThread
                .map(new Function<IApiCalls, List<DemoJSONAPIData>>() {
                    @Override
                    public List<DemoJSONAPIData> apply(
                            @io.reactivex.annotations.NonNull final IApiCalls apiCalls)
                            throws Exception {
                        // we want to have the geonames and not the wrapper object
                        return apiCalls.getDemoData();
                    }
                })
                .subscribe(new Consumer<List<Geoname>>() {
                    @Override
                    public void accept(
                            @io.reactivex.annotations.NonNull final List<Geoname> geonames)
                            throws Exception {
                        //display
                    }
                })
        );
    }
Run Code Online (Sandbox Code Playgroud)

Api接口

public interface ApiInterface {
    @GET("/posts")
    //Single<DemoJSONAPIData> getDemoData(); …
Run Code Online (Sandbox Code Playgroud)

android retrofit reactivex rx-java2

0
推荐指数
1
解决办法
2534
查看次数

使用Kotlin和rx-java combineLatest与表单验证混淆

所以我想用rx-java2进行表单验证.我正在使用Kotlin.我遇到了两个问题.emailObservable和passwordObservable都是类型Disposable!.我尝试通过调用指定类型,val emailObservable: Observable<Boolean>但Android Studio认为它Disposable!.

其次,当我想使用方法时combineLatest出现错误:使用提供的参数不能调用以下任何函数.

emailObservable和passwordObservable都能正常工作.我是rx-java的新手,我对这种类型的东西很困惑.

val emailObservable = RxTextView.afterTextChangeEvents(textEmail)
                .observeOn(AndroidSchedulers.mainThread())
                .map { x -> textEmail.text.length > 3 }
                .subscribe { x -> foo(x) }

val passwordObservable =RxTextView.afterTextChangeEvents(textPassword)
                .observeOn(AndroidSchedulers.mainThread())
                .map { x -> textPassword.text.length > 5 }
                .subscribe { x -> foo(x) }


Observable.combineLatest(emailObservable,
passwordObservable,
BiFunction { x: Boolean, y:Boolean -> x && y })
Run Code Online (Sandbox Code Playgroud)

kotlin rx-java rx-android rx-binding rx-java2

0
推荐指数
1
解决办法
402
查看次数

How to get index / position of list in onnext from Observable.fromIterable Rxjava?

I'm trying to loop a list but i'm not getting the index of current item.

Observable.fromIterable(yourList).observeOn(Schedulers.io())
            .observeOn(Schedulers.io()).subscribe(
                { item -> {        }},
                {_ ->{}},
                {->{}}
Run Code Online (Sandbox Code Playgroud)

Is there any way to get index just like

yourList.forEachIndexed{ index, item -> }
Run Code Online (Sandbox Code Playgroud)

I already know that

class Indexed {
int index;
String element;

Indexed(int index, String element) {
    this.index = index;
    this.element = element;
    }
}
Run Code Online (Sandbox Code Playgroud)

this can be used as a solution. But i don't like this kind of approach. I want …

android kotlin rx-java rx-java2

0
推荐指数
1
解决办法
175
查看次数