Schedulers.io创建数百个RxCachedThreadSchedulers

szi*_*ani 3 multithreading android rx-java rx-android

我在Android应用程序中使用RxJava,它多次遇到OutOfMemoryError。我通过设备管理器对其进行了检查,我刚刚发现,我有200多个线程,其中大多数处于等待状态,通常是RxCachedThreadSchedulers。由于线程太多而引发OOMError。我还注意到,如果我按下一个按钮,该按钮将调用服务并获取令牌并将其缓存,则线程数将增加5!

因此,我搜索并发现Schedulers.io可以创建无限线程。当我用Schedulers.computation替换每个Schedulers.io时,问题消失了,但这没有任何意义,因为我使用了Schedulers.io,就像应该使用它一样。

那么,如何使用Schedulers.io并确保它不会创建太多线程?

更新资料

我这样取消订阅:

    final Scheduler.Worker worker = Schedulers.io().createWorker();
    worker.schedule(new Action0() {
        @Override
        public void call() {
            long last = lastServerCommunication.getMillis();
            LongPreference pref = new LongPreference(mSharedPreferences, PREF_KEY_LAST_SERVER_COMMUNICATION);
            pref.set(last);
            worker.unsubscribe();
        }
    });
Run Code Online (Sandbox Code Playgroud)

更新#2

我使用Schedulers.io的常规方法是:

public Observable<Scenario> load() {
    return Observable
            .create(new Observable.OnSubscribe<Scenario>() {
                @Override
                public void call(Subscriber<? super Scenario> subscriber) {
                    try {
                        Scenario scenario = mGson.fromJson(mSharedPreferences.getString("SCENARIO", null), Scenario.class);
                        subscriber.onNext(scenario);
                        subscriber.onCompleted();
                    } catch (Exception e) {
                        subscriber.onError(new Throwable());
                    }
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}
Run Code Online (Sandbox Code Playgroud)

和:

    mSomeSubscription = mSomeManager.readFromDatabase()
            .subscribeOn(Schedulers.io())
            .subscribe(new Observer<List<SomeEntry>>() {
                @Override
                public void onCompleted() { }

                @Override
                public void onError(Throwable e) {
                    // some logging
                }

                @Override
                public void onNext(List<SomeEntry> Entries) {
                    // Some action
                }
            });
Run Code Online (Sandbox Code Playgroud)

szi*_*ani 5

好吧,我找到了原因。参见说明,更新#2

return Observable.create(new Observable.OnSubscribe<Something>() {
            @Override
            public void call(Subscriber<? super Something> subscriber) {
                try {
                  // Some action
                    subscriber.onNext(scenario);
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(new Throwable());
                }
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
Run Code Online (Sandbox Code Playgroud)

当您像这样创建冷的Observable序列时,必须确保在订户上调用onCompleted,请参见上文subscriber.onCompleted();。好吧,它不在代码中的某些地方,所以生成了io线程。

非常感谢akarnokd提供的帮助!