嗨,我刚开始学习使用RxJava2进行反应式编程.如何创建在后台线程中运行的任务,然后使用RxJava2在主线程上完成.
在Android中我们使用AsyncTask的示例就像下面的示例
private class MyTask extends AsyncTask<String, Integer, Boolean>
{
@Override
protected Boolean doInBackground(String... paths)
{
for (int index = 0; index < paths.length; index++)
{
boolean result = copyFileToExternal(paths[index]);
if (result == true)
{
// update UI
publishProgress(index);
}
else
{
// stop the background process
return false;
}
}
return true;
}
@Override
protected void onProgressUpdate(Integer... values)
{
super.onProgressUpdate(values);
int count = values[0];
// this will update my textview to show the number of files copied
myTextView.setText("Total files: …
Run Code Online (Sandbox Code Playgroud) 我一直在尝试为我的Room数据库编写单元测试,我以前在查询返回列表时已经完成了,我创建了一个允许主线程查询的数据库,但现在我正在尝试测试RxJava值我没有运气.
这是我的DAO代码:
@Dao
interface AccountDAO {
@Query("SELECT * FROM account")
fun getAll(): Flowable<List<Account>>
@Insert
fun insert(accounts: List<Account>): List<Long>
//...
}
Run Code Online (Sandbox Code Playgroud)
这是我的数据库测试代码,我已经尝试了一些事情来让它通过:
@RunWith(AndroidJUnit4::class)
class CCDatabaseTest {
//...
@JvmField @Rule val mainActivity = ActivityTestRule<MainActivity>(MainActivity::class.java)
@Before
fun setUp() {
val context = mainActivity.activity
database = Room.inMemoryDatabaseBuilder(context, CCDatabase::class.java).allowMainThreadQueries().build()
accountDao = database.accountDao()
transactionDao = database.transactionDao()
}
//...
@Test
fun testWriteReadAccount() {
val testAccount = Account(TEST_ACCOUNT_NAME, TEST_ACCOUNT_BALANCE)
val ids = accountDao.insert(listOf(testAccount))
assertEquals(1, ids.size)
val accountsFlowable = accountDao.getAll()
val testSubscriber = TestSubscriber<List<Account>>()
accountsFlowable.subscribe(testSubscriber)
testSubscriber.assertNoErrors()
// Fails: testSubscriber.assertValueCount(1) …
Run Code Online (Sandbox Code Playgroud) 尽管调试rxJava网络电话在一个应用程序我碰到这样一种情况,如果我们dispose
或clear
处置对象通过链的订阅返回observables
那么只有第一个observable
得到处理而不是其他链接observables
的flatMap
.
看看下面的演示代码片段:
CompositeDisposable testCompositeDisposal = new CompositeDisposable();
private void testLoadData() {
Disposable disposable = Observable.create(sbr -> {
for (int i = 0; i < 5; i++) {
Thread.sleep(3000);
Log.w("Debug: ", "First: " + i);
sbr.onNext(true);
}
sbr.onComplete();
}).subscribeOn(Schedulers.io()).flatMap(value -> Observable.create(sbr -> {
for (int i = 0; i < 5; i++) {
Thread.sleep(3000);
Log.w("Debug: ", "Second: " + i);
sbr.onNext(true);
}
sbr.onComplete();
})).doOnNext(value -> {
Log.w("Debug: ", "doONNext"); …
Run Code Online (Sandbox Code Playgroud) 我正在寻找rxjava2中推荐的做法,以处理一个可流动导致条件行为的情况.
更具体地说,我有一个Maybe<String>
我想要String
在数据库上更新的内容,如果String
存在,或者,如果它不存在,我想创建一个新的String
并将其保存在数据库中.
我想到了下面但很明显它不是我想要的:
Maybe<String> source = Maybe.just(new String("foo")); //oversimplified source
source.switchIfEmpty(Maybe.just(new String("bar"))).subscribe(result ->
System.out.println("save to database "+result));
source.subscribe(result -> System.out.println("update result "+result));
Run Code Online (Sandbox Code Playgroud)
以上显然产生了
save to database foo
update result foo
Run Code Online (Sandbox Code Playgroud)
我也试过下面给出了预期的结果,但仍觉得它......很奇怪.
Maybe<String> source = Maybe.just(new String("foo")); //oversimplified source
source.switchIfEmpty(Maybe.just(new String("bar")).doOnSuccess(result ->
System.out.println("save to database "+result))).subscribe();
source.doOnSuccess(result -> System.out.println("update result "+result)).subscribe();
Run Code Online (Sandbox Code Playgroud)
如何在结果存在时以及何时不存在时执行操作?该用例应该如何在rxjava2中处理?
我尝试了以下,它看起来比我上面提到的更清洁.注意确定推荐使用rxjava2但是......
Maybe.just(new String("foo"))
.map(value -> Optional.of(value))
.defaultIfEmpty(Optional.empty())
.subscribe(result -> {
if(result.isPresent()) {
System.out.println("update result "+result);
}
else {
System.out.println("save to …
Run Code Online (Sandbox Code Playgroud) 我正在使用Retrofit 2和RxJava2调用API。如果呼叫失败,在某些情况下(例如,没有Internet连接),我想向用户显示错误对话框,然后让他重试。
当我使用RxJava时,我正在考虑使用它,.retryWhen(...)
但是我不知道该怎么做,因为它需要等待用户按下对话框上的按钮。
目前,我显示该对话框,但在用户按下任何按钮之前会重试。另外,我希望在用户按下“取消”时不重试该呼叫。
这是我目前的代码:
private void displayDialog(DialogInterface.OnClickListener positive, DialogInterface.OnClickListener negative) {
AlertDialog.Builder builder = new AlertDialog.Builder(MainActivity.this);
builder.setMessage("Unexpected error, do you want to retry?")
.setPositiveButton("Retry", positive)
.setNegativeButton("Cancel", negative)
.show();
}
private Observable<Boolean> notifyUser() {
final PublishSubject<Boolean> subject = PublishSubject.create();
displayDialog(
(dialogInterface, i) -> subject.onNext(true),
(dialogInterface, i) -> subject.onNext(false)
);
return subject;
}
private void onClick() {
Log.d(TAG, "onClick");
getData()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.retryWhen(attempts -> {
return attempts.zipWith(
notifyUser(),
(throwable, res) -> res);
})
.subscribe(
s -> {
Log.d(TAG, "success"); …
Run Code Online (Sandbox Code Playgroud) 我有两个源可观察量.我想合并两个源可观察对象,但是只要其中一个源可观察对象完成,合并的可观察对象就会完成.
期望的行为:
Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged" ---1---2----3--4--x
Run Code Online (Sandbox Code Playgroud)
如果其中一个源出现错误,则错误应传播到合并的observable:
Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------e
"merged" ---1---2----3--4--ex
Run Code Online (Sandbox Code Playgroud)
"merge"运算符仅在两个源完成时完成合并流:
Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged" ---1---2----3--4-----------------------------x
Run Code Online (Sandbox Code Playgroud)
我怎样才能实现我想要的行为?
这个问题可能听起来很简单,但我很难过.
我可以通过这种方式进行改造2:
class RetrofitClient {
private static Retrofit retrofit = null;
static Retrofit getClient(String baseUrl) {
if (retrofit == null) {
retrofit = new Retrofit.Builder()
.baseUrl(baseUrl)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build();
}
return retrofit;
}
}
Run Code Online (Sandbox Code Playgroud)
Api服务界面:
@POST("postsInit")
@FormUrlEncoded
Call<InitPost> postInit(
@Field("appVersion") String versionName,
@Field("appId") String applicationId,
);
Run Code Online (Sandbox Code Playgroud)
最后:
apiService.postInit(versionName, applicationId).enqueue(new Callback<InitPost>() {
@Override
public void onResponse(@NonNull Call<InitPost> call, @NonNull Response<InitPost> response) {
if (response.isSuccessful()) {
Timber.d("post submitted to API");
getInitResponse();
}
}
@Override
public void onFailure(@NonNull Call<InitPost> call, @NonNull Throwable t) …
Run Code Online (Sandbox Code Playgroud) 最近,我已经了解了在设计应用程序的后端(存储库,而不是服务器端后端)时拥有单一来源(SSOT)的重要性.https://developer.android.com/topic/libraries/architecture/guide.html
通过开发新闻提要应用程序(使用awesome https://newsapi.org/),我试图了解有关应用程序架构的更多信息.但是,我不确定如何实现分页.顺便说一下,.:我正在使用MVVM作为我的表示层.View订阅了ViewModel的LiveData.ViewModel订阅了RxJava流.
我提出了以下方法:
interface NewsFeedRepository {
fun getNewsFeed(): Observable<List<Article>>
fun refreshFeed(): Completable
fun loadMore(): Completable
}
Run Code Online (Sandbox Code Playgroud)
ViewModel订阅getNewsFeed()
Observable,它在每次数据库中的基础数据(SSOT)发生更改时发出数据.但是,我的问题是关于这个loadMore()
方法.此方法尝试从API加载更多文章,并在成功时将它们插入本地数据库.这会导致getNewsFeed()
发出新的Feed数据.
我的问题:
1.存储库是否应负责同步API和本地数据库数据?或者存储库是否应该使用一些管理同步网络/本地数据的"低级"API?
2.该方法loadMore
返回一个Completable
看似奇怪/误导的方法.是否有更好的方法在存储库界面中指示分页?
3.分页意味着存储当前页数并在从API请求数据时使用它.应该在哪里存储当前页数?在存储库中?或者在存储库使用的某些"低级"组件中?
interface SearchArticleRepository {
fun searchArticles(sources: List<NewsSource>? = null, query: String? = null): Flowable<List<Article>>
fun moreArticles(): Completable
}
Run Code Online (Sandbox Code Playgroud)
此存储库用于搜索文章.该方法moreArticles()
尝试根据最后一次调用从API加载更多文章searchArticles(...)
.例如:ViewModel调用repo.searchArticles(q = "bitcoin")
.调用repo.moreArticles()
尝试"bitcoin"
从API 加载包含查询字符串的更多文章并与本地数据库同步.
我的问题再一次:
1.是否可以存储有关上一个请求的信息(searchArticles(request)
在存储库中?我也可以考虑传递请求参数moreArticles()
,例如moreArticles(sources: List<NewsSource>? = null, query: …
在阅读了多篇博客文章和文档后,我得出结论,以下doOnSubscribe
将在工作线程上执行:
Observable.just(1)
.observeOn(Schedulers.io())
.doOnSubscribe(__ -> Log.d("Testing", "Testing")) // Shouldn't this be on worker thread?
.subscribe();
Run Code Online (Sandbox Code Playgroud)
但是在调试之后,我看到doOnSubscribe
在主线程上执行了.我认为doOnSubscribe
它与其他运算符类似,因此在与subscribeOn
和相结合时具有类似的线程行为observeOn
.
我错过了什么?如何将doOnSubscribe
执行移至后台线程?
rx-java2 ×10
android ×7
rx-java ×4
kotlin ×2
retrofit2 ×2
android-room ×1
github-api ×1
java ×1
retrywhen ×1
rxjs ×1