我在一个新项目上使用RxJava 2(我已经使用了RxJava 1很长时间了)我在使用flatMap(或者flatMapSingle?)时遇到了一些问题.在整个概念中似乎都缺少一些东西.
mObjectManager.getAllObjects返回一个AsyncProcessor<List<Object>>.(我用'Object'替换了实际的Class名称).
Disposable subscription = mObjectManager.getAllObjects()
.flatMapSingle(new Function<List<Object>, SingleSource<Object>>() {
@Override
public SingleSource<Object > apply(@io.reactivex.annotations.NonNull List<Object> objects) throws Exception {
// TODO WHAT GOES HERE?!
}
}).filter(new Predicate<Object>() {
@Override
public boolean test(@io.reactivex.annotations.NonNull Object object) throws Exception {
return TextUtils.isEmpty(mSearchTerm) || object.name.toLowerCase().contains(mSearchTerm.toLowerCase());
}
}).toSortedList(new Comparator<Object>() {
@Override
public int compare(Object c1, Object c2) {
return c1.name.compareTo(c2.name);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<Object>>() {
@Override
public void accept(@io.reactivex.annotations.NonNull List<Object> objects) throws Exception {
processObjects(objects);
}
});
Run Code Online (Sandbox Code Playgroud)
我想知道如何将列表转换为SingleSource?如何在RxJava …
在Kotlin中,函数的最终语句可以解释为其返回值.
可以将以下示例的情况简化为更简洁吗?
{ text: String ->
val validated = validateText(text)
if (validated) {
actOnValidation()
}
validated
}
Run Code Online (Sandbox Code Playgroud)
我想要这样做的一个具体案例是在使用RxJava的示例中 - 即使有更好的Rx方法,我也对纯Kotlin解决方案感兴趣(如果存在的话).
fun inputChainObservable(targetField: TextView, chainedField: TextView): Observable<Boolean> {
return targetField.textChanges()
.observeOn(AndroidSchedulers.mainThread())
.map { cs: CharSequence? ->
val hasInput = validateText(cs.toString())
if (hasInput) {
chainedField.requestFocus()
}
hasInput
}
}
Run Code Online (Sandbox Code Playgroud) 我糊涂了.我有一个方法,做一些长期工作(约5秒)并返回String一个结果.我把这个函数包装成这样的Observablesmth:
private Observable<String> getJobObservable() {
return Observable.just(doJob());
}
Run Code Online (Sandbox Code Playgroud)
不过,即使.subscribeOn(Schedulers.computation())和.observeOn(Schedulers.computation())我的doJob()方法计算的mainThread
,但是,在我已经改变Observable.just(),以Observable.fromCallable()一切工作正常.为什么Observable.just()不对线程更改做出反应?
我有一个返回 a 的方法,Single<List<Item>>我想获取此列表中的每个项目并将其向下游传递给返回Completable. 我想等到每个项目成功完成并返回Completable结果。我最初的做法是分别处理每个项目使用flatMapIterable和组合使用的结果toList,但我不能把toList一个上Completable对象。有没有其他方法可以以这种方式将许多Completable任务“聚合”为一个Completable?这是我到目前为止所拥有的:
public Single<List<Item>> getListOfItems() {
...
}
public Completable doSomething(Item item) {
...
}
public Completable processItems() {
return getListOfItems()
.toObservable()
.flatMapIterable(items -> items)
.flatMapCompletable(item -> doSomething(item))
.toList() // ERROR: No method .toList() for Completable
.ignoreElements();
}
Run Code Online (Sandbox Code Playgroud) 如何合并两个BehaviorSubjects,使它们表现得像一个BehaviorSubject?
我有这样的事情:
class Solution {
public static void main(String[] args) {
Subject<List<Integer>> left = BehaviorSubject.createDefault(Arrays.asList(1, 2, 3));
Subject<List<Integer>> right = BehaviorSubject.createDefault(Arrays.asList(4, 5, 6));
Single<List<Integer>> merged = left.mergeWith(right).reduce(new ArrayList<Integer>(), (l, r) -> {
List<Integer> merged1 = new ArrayList<>(l.size() + r.size());
merged1.addAll(l);
merged1.addAll(r);
return merged1;
});
merged.subscribe(System.out::println);
}
}
Run Code Online (Sandbox Code Playgroud)
我希望得到一些东西[1, 2, 3, 4, 5, 6],但subscribe什么都不打印.
在我的Android应用程序中,我想更新数据库条目的一部分(使用Room),然后在更新完成后立即从数据库中读取整个条目.
在阅读了RxJava Completable的文档后,我希望公开的最终Single andThen(SingleSource next)为我完成这项工作.但以下代码段仅记录"可完成".不记录"Single"和"From Single".
import io.reactivex.Completable;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableOnSubscribe;
import io.reactivex.Single;
import io.reactivex.functions.Consumer;
Completable completable = Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter e) throws Exception {
Log.e(LOG, "Completable");
}
});
Single<Long> single = Single.fromCallable(new Callable<Long>() {
@Override
public Long call() throws Exception {
Log.e(LOG, "Single");
return Long.valueOf(123);
}
});
completable.andThen(single).subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(LOG, "From Single: " + aLong.toString());
}
}, new Consumer<Throwable>() { …Run Code Online (Sandbox Code Playgroud) 我想执行以下操作.我有一个事务列表,我希望通过为每个事务发出2个api请求(我正在使用retrofit2)来更新,然后将结果保存到数据库中(使用观察者).经过一些搜索,我决定使用zip运算符来组合2个请求,但我遇到的问题是我无法确定整个过程何时完成更新UI.代码看起来像这样.
for (Transaction realmTransaction : allTransactions) {
Observable<Map<String, String>> obs1 = getObs1(realmTransaction);
Observable<Map<String, String>> obs2= getObs2(realmTransaction);
Observable.zip(obs1, obs2,
(map1, map2) -> {
Map<String, String> combined = new HashMap<>();
// do some processing and return a single map after
return combined;
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver(realmTransaction));
}
public Observer<Map<String, String>> getObserver(Transaction t){
return new Observer<Map<String, String>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Map<String, String> stringStringMap) {
// update database
}
@Override
public void onError(Throwable e) { …Run Code Online (Sandbox Code Playgroud) 因此,我尝试使用rxjava2和改造来实现即时搜索,该过程很简单,只要用户更改了文本publish.onNext()(发布是一个PublishSubject对象)就可以了。我添加了filter和debounce以及switch映射运算符,以在文本的长度大于阈值并且不会同时使用连续输入进行调用时方便从服务器进行搜索。
这是代码:
subject = PublishSubject.create();
getCompositeDisposable().add(subject
.filter(s -> s.length() >= 3)
.debounce(300,
TimeUnit.MILLISECONDS)
.switchMap(s -> getDataManager().getHosts(
getDataManager().getDeviceToken(),
s).observeOn(Schedulers.io()))
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(hostResponses -> {
getMvpView().hideEditLoading();
if (hostResponses.size() != 0) {
if (this.hostResponses != null)
this.hostResponses.clear();
this.hostResponses = hostResponses;
getMvpView().setHostView(getHosts(hostResponses));
} else {
getMvpView().onFieldError("No host found");
}
}, throwable -> {
getMvpView().hideEditLoading();
if (throwable instanceof HttpException) {
HttpException exception = (HttpException)throwable;
if (exception.code() == 401) {
getMvpView().onError(R.string.code_expired,
BaseUtils.TOKEN_EXPIRY_TAG);
}
}
})
);
Run Code Online (Sandbox Code Playgroud)
现在我的代码可以正常工作,可以满足我的需要,但是当我输入一个长字符串并按退格按钮时,我遇到了一个错误,那就是当清除AutoCompleteTextView的文本时,会引发异常
这是异常的堆栈跟踪:
java.io.InterruptedIOException: thread interrupted
at …Run Code Online (Sandbox Code Playgroud) 我在春季启动中使用Rxjava2。
我在服务器上有500个并发请求。
每个请求产生10个线程,这些线程调用其他服务(因此IO工作)
因此,在这种情况下,我应该使用Schedulers.io()还是Schedulers.compuatation()。
基本上,我的困惑是理想上io()应该使用的,因为这是IO工作,但这会创建大量线程吗?
还可以指定计算线程的池大小吗?还可以指定io线程的池大小吗?
我使用的版本是
implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'com.jakewharton.rxbinding2:rxbinding:2.0.0'
implementation 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
implementation 'android.arch.persistence.room:runtime:2.1.0-alpha04'
kapt 'android.arch.persistence.room:compiler:2.1.0-alpha04'
implementation 'android.arch.persistence.room:rxjava2:2.1.0-alpha04'
Run Code Online (Sandbox Code Playgroud)
道是
@Insert(onConflict = OnConflictStrategy.REPLACE)
fun insertStore(stores: Stores): Completable
Run Code Online (Sandbox Code Playgroud)