标签: rx-java2

RxJava2就像Android中的AsyncTask一样

嗨,我刚开始学习使用RxJava2进行反应式编程.如何创建在后台线程中运行的任务,然后使用RxJava2在主线程上完成.

在Android中我们使用AsyncTask的示例就像下面的示例

private class MyTask extends AsyncTask<String, Integer, Boolean>
{
    @Override
    protected Boolean doInBackground(String... paths)
    {
        for (int index = 0; index < paths.length; index++)
        {
            boolean result = copyFileToExternal(paths[index]);

            if (result == true)
            {
                // update UI
                publishProgress(index);
            }
            else
            {
                // stop the background process
                return false;
            }
        }

        return true;
    }

    @Override
    protected void onProgressUpdate(Integer... values)
    {
        super.onProgressUpdate(values);
        int count = values[0];
        // this will update my textview to show the number of files copied
        myTextView.setText("Total files: …
Run Code Online (Sandbox Code Playgroud)

android reactive-programming rx-java rx-java2

6
推荐指数
1
解决办法
3652
查看次数

测试RxJava2可流动查询室

我一直在尝试为我的Room数据库编写单元测试,我以前在查询返回列表时已经完成了,我创建了一个允许主线程查询的数据库,但现在我正在尝试测试RxJava值我没有运气.

这是我的DAO代码:

@Dao
interface AccountDAO {
    @Query("SELECT * FROM account")
    fun getAll(): Flowable<List<Account>>

    @Insert
    fun insert(accounts: List<Account>): List<Long>

    //...
}
Run Code Online (Sandbox Code Playgroud)

这是我的数据库测试代码,我已经尝试了一些事情来让它通过:

@RunWith(AndroidJUnit4::class)
class CCDatabaseTest {
    //...

    @JvmField @Rule val mainActivity = ActivityTestRule<MainActivity>(MainActivity::class.java)

    @Before
    fun setUp() {
        val context = mainActivity.activity
        database = Room.inMemoryDatabaseBuilder(context, CCDatabase::class.java).allowMainThreadQueries().build()
        accountDao = database.accountDao()
        transactionDao = database.transactionDao()
    }

    //...

    @Test
    fun testWriteReadAccount() {
        val testAccount = Account(TEST_ACCOUNT_NAME, TEST_ACCOUNT_BALANCE)

        val ids = accountDao.insert(listOf(testAccount))
        assertEquals(1, ids.size)

        val accountsFlowable = accountDao.getAll()
        val testSubscriber = TestSubscriber<List<Account>>()
        accountsFlowable.subscribe(testSubscriber)

        testSubscriber.assertNoErrors()
        // Fails: testSubscriber.assertValueCount(1) …
Run Code Online (Sandbox Code Playgroud)

kotlin rx-java2 android-room

6
推荐指数
1
解决办法
1651
查看次数

在处理时停止rxJava可观察链执行

尽管调试rxJava网络电话在一个应用程序我碰到这样一种情况,如果我们disposeclear处置对象通过链的订阅返回observables那么只有第一个observable得到处理而不是其他链接observablesflatMap.

看看下面的演示代码片段:

CompositeDisposable testCompositeDisposal = new CompositeDisposable();

private void testLoadData() {
    Disposable disposable = Observable.create(sbr -> {
        for (int i = 0; i < 5; i++) {
            Thread.sleep(3000);
            Log.w("Debug: ", "First: " + i);
            sbr.onNext(true);
        }
        sbr.onComplete();
    }).subscribeOn(Schedulers.io()).flatMap(value -> Observable.create(sbr -> {
        for (int i = 0; i < 5; i++) {
            Thread.sleep(3000);
            Log.w("Debug: ", "Second: " + i);
            sbr.onNext(true);
        }
        sbr.onComplete();
    })).doOnNext(value -> {
        Log.w("Debug: ", "doONNext"); …
Run Code Online (Sandbox Code Playgroud)

android rx-java rx-java2

6
推荐指数
1
解决办法
4928
查看次数

使用RxJava2和Retrofit2时如何访问响应头?

前提

我正在开发一个简单的应用程序,我想在RecyclerView中列出用户的GitHub存储库.我在构建它时使用作为我的端点.

问题

我面临的问题是GitHub API一次只返回30个repos.为了获得更多,我可以per_page=100在我的查询字符串中附加一个(100是最大值); 但是,我们如何处理超过100个回购的用户?

API 文档提供的解决方案是next;从"链接"响应标头获取URL以进行第二次API调用.

怎么会这样呢?谢谢!

android github-api retrofit2 clean-architecture rx-java2

6
推荐指数
1
解决办法
1545
查看次数

rxjava2 - 如果在Maybe上的话

我正在寻找rxjava2中推荐的做法,以处理一个可流动导致条件行为的情况.

更具体地说,我有一个Maybe<String>我想要String在数据库上更新的内容,如果String存在,或者,如果它不存在,我想创建一个新的String并将其保存在数据库中.

我想到了下面但很明显它不是我想要的:

Maybe<String> source = Maybe.just(new String("foo")); //oversimplified source
source.switchIfEmpty(Maybe.just(new String("bar"))).subscribe(result -> 
System.out.println("save to database "+result));
source.subscribe(result -> System.out.println("update result "+result));
Run Code Online (Sandbox Code Playgroud)

以上显然产生了

save to database foo
update result foo
Run Code Online (Sandbox Code Playgroud)

我也试过下面给出了预期的结果,但仍觉得它......很奇怪.

Maybe<String> source = Maybe.just(new String("foo")); //oversimplified source
source.switchIfEmpty(Maybe.just(new String("bar")).doOnSuccess(result -> 
System.out.println("save to database "+result))).subscribe();
source.doOnSuccess(result -> System.out.println("update result "+result)).subscribe();
Run Code Online (Sandbox Code Playgroud)

如何在结果存在时以及何时不存在时执行操作?该用例应该如何在rxjava2中处理?

更新01

我尝试了以下,它看起来比我上面提到的更清洁.注意确定推荐使用rxjava2但是......

Maybe.just(new String("foo"))
     .map(value -> Optional.of(value))
     .defaultIfEmpty(Optional.empty())
     .subscribe(result -> {
         if(result.isPresent()) {
             System.out.println("update result "+result);
         }
         else {
             System.out.println("save to …
Run Code Online (Sandbox Code Playgroud)

java rx-java2

6
推荐指数
1
解决办法
2945
查看次数

显示对话框后,使用Retrofit 2和RxJava2重试呼叫

我正在使用Retrofit 2和RxJava2调用API。如果呼叫失败,在某些情况下(例如,没有Internet连接),我想向用户显示错误对话框,然后让他重试。

当我使用RxJava时,我正在考虑使用它,.retryWhen(...)但是我不知道该怎么做,因为它需要等待用户按下对话框上的按钮。

目前,我显示该对话框,但在用户按下任何按钮之前会重试。另外,我希望在用户按下“取消”时不重试该呼叫。

这是我目前的代码:

private void displayDialog(DialogInterface.OnClickListener positive, DialogInterface.OnClickListener negative) {
    AlertDialog.Builder builder = new AlertDialog.Builder(MainActivity.this);
    builder.setMessage("Unexpected error, do you want to retry?")
            .setPositiveButton("Retry", positive)
            .setNegativeButton("Cancel", negative)
            .show();
}

private Observable<Boolean> notifyUser() {
    final PublishSubject<Boolean> subject = PublishSubject.create();
    displayDialog(
            (dialogInterface, i) -> subject.onNext(true),
            (dialogInterface, i) -> subject.onNext(false)
    );

    return subject;
}

private void onClick() {
    Log.d(TAG, "onClick");
    getData()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .retryWhen(attempts -> {
                return attempts.zipWith(
                        notifyUser(),
                        (throwable, res) -> res);
            })
            .subscribe(
                    s -> {
                        Log.d(TAG, "success"); …
Run Code Online (Sandbox Code Playgroud)

android rx-java2 retrywhen

6
推荐指数
1
解决办法
3057
查看次数

RxJava/RxJs:如何合并两个源可观察对象,但只要其中一个完成就完成

我有两个源可观察量.我想合并两个源可观察对象,但是只要其中一个源可观察对象完成,合并的可观察对象就会完成.

期望的行为:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged"  ---1---2----3--4--x
Run Code Online (Sandbox Code Playgroud)

如果其中一个源出现错误,则错误应传播到合并的observable:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------e
"merged"  ---1---2----3--4--ex
Run Code Online (Sandbox Code Playgroud)

"merge"运算符仅在两个源完成时完成合并流:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged"  ---1---2----3--4-----------------------------x
Run Code Online (Sandbox Code Playgroud)

我怎样才能实现我想要的行为?

rxjs rx-java rx-java2

6
推荐指数
1
解决办法
1448
查看次数

如何使用RxJava 2 + Retrofit 2进行发布?

这个问题可能听起来很简单,但我很难过.

我可以通过这种方式进行改造2:

class RetrofitClient {

    private static Retrofit retrofit = null;

    static Retrofit getClient(String baseUrl) {
        if (retrofit == null) {
            retrofit = new Retrofit.Builder()
                    .baseUrl(baseUrl)
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                    .addConverterFactory(GsonConverterFactory.create())
                    .build();
        }
        return retrofit;
    }

}
Run Code Online (Sandbox Code Playgroud)

Api服务界面:

 @POST("postsInit")
    @FormUrlEncoded
    Call<InitPost> postInit(
            @Field("appVersion") String versionName,
            @Field("appId") String applicationId,

    );
Run Code Online (Sandbox Code Playgroud)

最后:

apiService.postInit(versionName, applicationId).enqueue(new Callback<InitPost>() {
            @Override
            public void onResponse(@NonNull Call<InitPost> call, @NonNull Response<InitPost> response) {

                if (response.isSuccessful()) {
                    Timber.d("post submitted to API");
                    getInitResponse();
                }
            }

            @Override
            public void onFailure(@NonNull Call<InitPost> call, @NonNull Throwable t) …
Run Code Online (Sandbox Code Playgroud)

android rx-java retrofit2 rx-java2

6
推荐指数
1
解决办法
4068
查看次数

Android:存储库模式和分页

最近,我已经了解了在设计应用程序的后端(存储库,而不是服务器端后端)时拥有单一来源(SSOT)的重要性.https://developer.android.com/topic/libraries/architecture/guide.html

通过开发新闻提要应用程序(使用awesome https://newsapi.org/),我试图了解有关应用程序架构的更多信息.但是,我不确定如何实现分页.顺便说一下,.:我正在使用MVVM作为我的表示层.View订阅了ViewModel的LiveData.ViewModel订阅了RxJava流.

我提出了以下方法:

interface NewsFeedRepository {
        fun getNewsFeed(): Observable<List<Article>>
        fun refreshFeed(): Completable
        fun loadMore(): Completable
}
Run Code Online (Sandbox Code Playgroud)

ViewModel订阅getNewsFeed()Observable,它在每次数据库中的基础数据(SSOT)发生更改时发出数据.但是,我的问题是关于这个loadMore()方法.此方法尝试从API加载更多文章,并在成功时将它们插入本地数据库.这会导致getNewsFeed()发出新的Feed数据.

我的问题:

1.存储库是否应负责同步API和本地数据库数据?或者存储库是否应该使用一些管理同步网络/本地数据的"低级"API?

2.该方法loadMore返回一个Completable看似奇怪/误导的方法.是否有更好的方法在存储库界面中指示分页?

3.分页意味着存储当前页数并在从API请求数据时使用它.应该在哪里存储当前页数?在存储库中?或者在存储库使用的某些"低级"组件中?

interface SearchArticleRepository {
    fun searchArticles(sources: List<NewsSource>? = null, query: String? = null): Flowable<List<Article>>
    fun moreArticles(): Completable  
}
Run Code Online (Sandbox Code Playgroud)

此存储库用于搜索文章.该方法moreArticles()尝试根据最后一次调用从API加载更多文章searchArticles(...).例如:ViewModel调用repo.searchArticles(q = "bitcoin").调用repo.moreArticles()尝试"bitcoin"从API 加载包含查询字符串的更多文章并与本地数据库同步.

我的问题再一次:

1.是否可以存储有关上一个请求的信息(searchArticles(request)在存储库中?我也可以考虑传递请求参数moreArticles(),例如moreArticles(sources: List<NewsSource>? = null, query: …

android kotlin rx-java2

6
推荐指数
0
解决办法
930
查看次数

在主线程上调用doOnSubscribe

在阅读了多篇博客文章和文档后,我得出结论,以下doOnSubscribe将在工作线程上执行:

Observable.just(1)
            .observeOn(Schedulers.io())
            .doOnSubscribe(__ -> Log.d("Testing", "Testing")) // Shouldn't this be on worker thread?
            .subscribe();
Run Code Online (Sandbox Code Playgroud)

但是在调试之后,我看到doOnSubscribe在主线程上执行了.我认为doOnSubscribe它与其他运算符类似,因此在与subscribeOn和相结合时具有类似的线程行为observeOn.

我错过了什么?如何将doOnSubscribe执行移至后台线程?

android reactive-programming rx-java2

6
推荐指数
2
解决办法
1480
查看次数