RxJava2就像Android中的AsyncTask一样

Inv*_*ame 6 android reactive-programming rx-java rx-java2

嗨,我刚开始学习使用RxJava2进行反应式编程.如何创建在后台线程中运行的任务,然后使用RxJava2在主线程上完成.

在Android中我们使用AsyncTask的示例就像下面的示例

private class MyTask extends AsyncTask<String, Integer, Boolean>
{
    @Override
    protected Boolean doInBackground(String... paths)
    {
        for (int index = 0; index < paths.length; index++)
        {
            boolean result = copyFileToExternal(paths[index]);

            if (result == true)
            {
                // update UI
                publishProgress(index);
            }
            else
            {
                // stop the background process
                return false;
            }
        }

        return true;
    }

    @Override
    protected void onProgressUpdate(Integer... values)
    {
        super.onProgressUpdate(values);
        int count = values[0];
        // this will update my textview to show the number of files copied
        myTextView.setText("Total files: " + count);
    }

    @Override
    protected void onPostExecute(Boolean result)
    {
        super.onPostExecute(result);
        if (result)
        {
            // display a success dialog
            ShowSuccessAlertDialog();
        }
        else
        {
            // display a fail dialog
            ShowFailAlertDialog();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

对于这个例子,我想传入一个字符串的Array/ArrayList,它用于在后台线程中执行一些方法.然后每个成功结果都将更新我的TextView(UI线程).如果其中一个进程失败,我希望它直接停止.最后,我想在流程完成后更新我的视图.

我只能做到这一点

Observable.just(paths).subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<ArrayList<String>>()
            {
                private boolean result;

                @Override
                public void onSubscribe(Disposable d)
                {
                }

                @Override
                public void onNext(ArrayList<String> paths)
                {
                    for (int index = 0; index < paths.size(); index++)
                    {
                        result = copyFileToExternal(paths[index]);

                        if (result == true)
                        {
                            // cant update UI because is in background thread
                            myTextView.setText("Total files: " + index);
                        }
                        else
                        {
                            // end the loop
                            break;
                        }
                    }
                }

                @Override
                public void onError(Throwable e)
                {
                }

                @Override
                public void onComplete()
                {
                    if (result)
                    {
                        // cant display because it is still in background thread
                        ShowSuccessAlertDialog();
                    }
                    else
                    {
                        // cant display because it is still in background thread
                        ShowFailAlertDialog();
                    }
                }
            });
Run Code Online (Sandbox Code Playgroud)

我看了几个教程,但似乎无法找到答案.

在此先感谢您的帮助

GVi*_*i82 5

我会做这样的事情:

Observable.fromArray(getPaths())
    .map(path -> copyFileToExternal(path))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(aInteger -> Log.i("test", "update UI"),
               throwable -> ShowFailAlertDialog),
               () -> ShowSuccessAlertDialog());
Run Code Online (Sandbox Code Playgroud)

一个好主意通常是有一个“处理程序”来控制对观察者的订阅。这样,当您需要停止后台任务时(例如因为用户离开了 Activity),您可以使用它。为此,您可以使用subscribeWith代替subscribe, 接收作为输入 a ResourceObserver: 这样您就可以得到 a Disposable

Disposable subscription = Observable.fromArray(getPaths())
    .map(path -> copyFileToExternal(path))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeWith(new ResourceObserver<Integer>() {
         @Override
         public void onNext(@NonNull Integer index) {
             Log.i("test", "update UI");
         }
         @Override
         public void onError(@NonNull Throwable e) {
             ShowFailAlertDialog();
         }
         @Override
         public void onComplete() {
             ShowSuccessAlertDialog();
         }
   });
Run Code Online (Sandbox Code Playgroud)

当您需要停止任务时,您可以调用:

subscription.dispose();
Run Code Online (Sandbox Code Playgroud)