如何在 RXJava Android 中对后台线程执行长时间计算

Ehs*_*jum 3 java multithreading android rx-java

我想在 android 中使用 RXJava 在后台线程上执行长时间计算。经过计算,我试图在 Recylerview 中呈现结果。我正在使用以下代码:

Observable.just("true")
                .subscribeOn(Schedulers.io())
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {
                        feedlist.clear();
                        if (eventFeedItems != null && !eventFeedItems.isEmpty()) {
                            for (int i = 0; i < eventFeedItems.size(); i++) {
                                if (eventFeedItems != null && eventFeedItems.get(i) != null
                                        && ((eventFeedItems.get(i).getType() != null && eventFeedItems.get(i).getType().equalsIgnoreCase("EVENT"))
                                        || (eventFeedItems.get(i).getActivityRequestType() != null && eventFeedItems.get(i).getActivityRequestType().equalsIgnoreCase(EventConstants.TRENDING_ACTIVITY)))) {
                                    if (eventFeedItems.get(i).getActivityRequestType() != null && !eventFeedItems.get(i).getActivityRequestType().equalsIgnoreCase("")) {
                                        feedlist.add(new FeedsListModel(eventFeedItems.get(i), eventFeedItems.get(i).getActivityRequestType(), null));
                                    } else if (eventFeedItems.get(i).getRequestType() != null && !eventFeedItems.get(i).getRequestType().equalsIgnoreCase("")) {
                                        feedlist.add(new FeedsListModel(eventFeedItems.get(i), eventFeedItems.get(i).getRequestType(), null));
                                    } else
                                        feedlist.add(new FeedsListModel(eventFeedItems.get(i), EventConstants.ATTENDEE_POST, null));
                                }
                            }
                        }
                        Log.d("calculations","Completed");
                        return "";
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
//                        feed_list.setLayoutManager(mLayoutManager);
//                        feedListAdapter.notifyDataSetChanged();
                        Log.d("Adapter", "Set");
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.d("Exception", "oh! fish...");
                        throwable.printStackTrace();
                    }
                });
Run Code Online (Sandbox Code Playgroud)

使用上面的代码,我面临 UI Hindring,因为 ArrayList eventFeedItems 大小大约超过 300 个项目。我是 RXJava 的新手。请帮帮我。

Han*_*rst 5

您将无法使用 map-Operator 实现并发。

第一个 subscribeOn 将所有发射移动到 IO-scheduler。这里没有并发。

.subscribeOn(Schedulers.io())
Run Code Online (Sandbox Code Playgroud)

map-operator 将从前一个线程同步调用。在您的情况下,它将是来自 IO-threadpool 的一些线程。

.map(new Func1<String, String>() {
Run Code Online (Sandbox Code Playgroud)

执行 map-Operator 后,您会将值从 IO 线程移动到 Android-UI-event-loop

.observeOn(AndroidSchedulers.mainThread())
Run Code Online (Sandbox Code Playgroud)

在将值从 IO 线程转换为 UI 线程后,将处理来自初始 observable 的下一个值。

Observable.just("true")
Run Code Online (Sandbox Code Playgroud)

在您的示例中,将不再有值,因为您只生成一个值。

为了实现并发,您应该使用 flatMap 而不是 map。并在 flatMap 中使用 subscribeOn() 在另一个线程上创建每个流。

请考虑这个例子,看看并发是如何发生的。每个 observable 都将被立即订阅,因此最大。测试时间约为 5 秒。如果现在发生并发,则需要 1+2+3+4+5 秒加上执行时间。

@Test
public void name1() throws Exception {

        Observable<Integer> value = Observable.just(1_000, 2_000, 3_000, 4_000, 5_000)
                .flatMap(i -> Observable.fromCallable(() -> doWork(i)).subscribeOn(Schedulers.io())
                ).doOnNext(integer -> System.out.println("value"));

        value.test().awaitTerminalEvent();
}

private int doWork(int sleepMilli) {
        try {
            Thread.sleep(sleepMilli);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return -1;
}
Run Code Online (Sandbox Code Playgroud)

如果您想了解更多关于 flatMap 并发如何发生的信息,请考虑阅读http://tomstechnicalblog.blogspot.de/2015/11/rxjava-achieving-parallelization.html

关于您的代码,我建议:

  • 将接口的匿名实现移动到私有内部类实现并使用它的实例。您将获得更具可读性的 observable
  • 不要对来自内部运算符的全局变量使用副作用。如果涉及并发,您将获得竞争条件。

    List<FeedsListModel> eventFeedItems = Arrays.asList(new FeedsListModel(), new FeedsListModel());
    
    Observable<FeedsListModel> feedsListModelObservable = Observable.fromIterable(eventFeedItems)
                        .flatMap(feedsListModel -> Observable.fromCallable(() -> calculation(feedsListModel))
                                .subscribeOn(Schedulers.computation())
    );
    
    feedsListModelObservable
            .toList()
            .observeOn(Schedulers.io())
            .subscribe(feedsListModels -> {
                // do UI stuff
    });
    
    Run Code Online (Sandbox Code Playgroud)

帮助方法:

private FeedsListModel calculation(FeedsListModel model) {
    // do calculation here

    return new FeedsListModel();
}
Run Code Online (Sandbox Code Playgroud)