我在Observable中解析SVG文件.在解析XML时,点作为"Path"对象发出.解析发生在一个单独的线程中,我想逐点绘制SVG文件.换句话说,我想逐个向UI发射点,例如每50毫秒一个.
private void drawPath(final String chars) {
Observable.create(new Observable.OnSubscribe<Path>() {
@Override public void call(Subscriber<? super Path> subscriber) {
try {
while ([omitted]) {
// omitted: a lot of processing
// an XML path from an SVG file is parsed into an Android path to be drawn on a canvas
// this happens point by point
subscriber.onNext(path); // emit path point by point as the XML is processed
}
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
}).buffer(50, TimeUnit.MILLISECONDS, …Run Code Online (Sandbox Code Playgroud) 我有一个SearchView执行网络请求来搜索某些曲目,然后用结果填充RecylerView.我发现这个代码工作正常.
我已经通过适配器集成了RecyclerView EmptyView,但现在我正在尝试在此代码中集成LoadingView(Progress)和ErrorView.我试图在ConcatMap中将LoadView(ProgressBar)放在Visibility True上但是得到的错误是"只有创建视图层次结构的原始线程可以触及它的视图."这可以解决在MainThread上运行它但我确定有一个更好的方法来做到这一点.
有人可以更好地了解显示/隐藏ErrorView和LoadingView的逻辑可以在何处以及如何集成到此代码中?
我也在使用RxBinding.也许还使用RxRecyclerView会是一个好主意?
RxSearchView.queryTextChanges(searchView).
filter(charSequence ->
!TextUtils.isEmpty(charSequence))
.throttleLast(100, TimeUnit.DAYS.MILLISECONDS)
.debounce(200, TimeUnit.MILLISECONDS)
.onBackpressureLatest()
.concatMap(searchTerm ->
{
return searchTracks(searchTerm).
.subscribeOn(Schedulers.io())
.onErrorResumeNext(throwable1 -> {
//handle error somehow, change UI
return Observable.empty();
}
);
}
)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(tracks -> {
populateTracks(tracks);
}
});
}, throwable -> {
//show errorView
});
Run Code Online (Sandbox Code Playgroud) 运行我的示例代码后,我期望结果如下.
atest
btest
ctest
但实际上什么都没发生.
请让我知道我的代码有什么问题.
Subject<String, String> subject = PublishSubject.create();
subject.onNext("test");
Observable<String> observable = Observable.from(new String[] {"a", "b", "c"}).repeat(2);
observable.withLatestFrom(subject, (s1, s2) -> s1 + s2)
.subscribe(s -> System.out.println(s));
Run Code Online (Sandbox Code Playgroud) 我试图以一些延迟来监控文本更改,以避免来自侦听器的垃圾邮件.但是我当然不希望收到一些已经处理过的物品.
这个观察者
RxTextView.textChanges(editText)
.delay(2, TimeUnit.SECONDS)
.distinctUntilChanged()
.filter(charSequence -> charSequence.length() != 0)
.subscribe(charSequence1 -> Log.e("!@#", charSequence1));
Run Code Online (Sandbox Code Playgroud)
当我输入"abcd"时,会弹出这样的项目:
E/!@#: abcd
E/!@#: abcd
E/!@#: abcd
E/!@#: abcd
Run Code Online (Sandbox Code Playgroud)
所以我收到了4次emmited项目,但字符串是平等的,并且有distinctUntilChanged.为什么distinctUntilChanged不在这种情况下工作?是否有可能通过rx运算符延迟实现此逻辑?
我正在尝试理解RxJava,我确信这个问题是无意义的...我使用RxJava这个代码:
public Observable<T> getData(int id) {
if (dataAlreadyLoaded()) {
return Observable.create(new Observable.OnSubscribe<T>(){
T data = getDataFromMemory(id);
subscriber.onNext(data);
});
}
return Observable.create(new Observable.OnSubscribe<T>(){
@Override
public void call(Subscriber<? super String> subscriber) {
T data = getDataFromRemoteService(id);
subscriber.onNext(data);
}
});
}
Run Code Online (Sandbox Code Playgroud)
并且,例如,我可以这样使用它:
Action1<String> action = new Action<String>() {
@Override
public void call(String s) {
//Do something with s
}
};
getData(3).subscribe(action);
Run Code Online (Sandbox Code Playgroud)
而另一个回调实现Runnable:
public void getData(int id, MyClassRunnable callback) {
if (dataAlreadyLoaded()) {
T data = getDataFromMemory(id);
callback.setData(data);
callback.run();
} else { …Run Code Online (Sandbox Code Playgroud) 我有一个WS,如果成功则返回200,没有任何正文,否则返回420,错误正文中的json不成功
返回类型为
Observable<Response<Void>>
Run Code Online (Sandbox Code Playgroud)
出于某种原因,如果出现420代码错误,onNext(Response<Void> value)则调用onError(Throwable e)该代码,而不是针对其他任何不成功的请求调用该代码。
为什么仅在这种情况下才调用onNext而不是onError?如果请求未返回200,则可以调用onError?
我正在尝试压缩两个在不同线程上发出的Observable:
Observable<String> xxxx1 = Observable.fromCallable((Func0<String>) () -> {
try {
Thread.sleep((long)(Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "First";
})
.doOnNext(s -> Log.d("TEEEST", "1 onNext " + s + " thread " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.computation());
Observable<String> xxxx2 = Observable.fromCallable((Func0<String>) () -> {
try {
Thread.sleep((long)(Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Second";
})
.doOnNext(s -> Log.d("TEEEST", "2 onNext " + s + " thread " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.io());
Observable.zip(xxxx1, xxxx2, (s1, s2) -> …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用.distinctUntilChanged()它,并且不会switchmap()在第一次之后传递价值。
RxTextView.textChanges(etUserQuery).debounce(300, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread()).filter(charSequence -> {
if (charSequence.toString().isEmpty()) {
etUserQuery.setHint("Please type username");
return false;
} else
return true;
}).distinctUntilChanged()
.switchMap(charSequence -> vm.dataFromNetwork(charSequence.toString()))
.subscribe(fetchUserResponce -> {
noDataText.setVisibility(fetchUserResponce.getItems().size() == 0 ? View.VISIBLE : View.GONE);
mUsersListAdapter.updateData(fetchUserResponce.getItems());
}));
Run Code Online (Sandbox Code Playgroud)
是正确的使用地点.distinctUntilChanged()吗?
我有一个Completable要执行的RxJava ,然后链接到Single<Long>。我可以这样写:
return Completable.complete().toSingleDefault(0L).flatMap { Single.just(1L) }
Run Code Online (Sandbox Code Playgroud)
但这似乎不必要地复杂。我本以为Completable#toSingle()会做的,但是如果我写:
Completable.complete().toSingle { Single.just(1L) }
Run Code Online (Sandbox Code Playgroud)
我遇到错误。是否缺少功能Completable或我正在忽略某些东西?
我有一个活动,每次用户输入更改时我都会向其发出网络请求.
api定义如下:
interface Api {
@GET("/accounts/check")
fun checkUsername(@Query("username") username: String): Observable<UsernameResponse>
}
Run Code Online (Sandbox Code Playgroud)
然后是管理它的服务:
class ApiService {
var api: Api
init {
api = retrofit.create(Api::class.java)
}
companion object {
val baseUrl: String = "https://someapihost"
var rxAdapter: RxJava2CallAdapterFactory = RxJava2CallAdapterFactory.create()
val retrofit: Retrofit = Retrofit.Builder()
.baseUrl(baseUrl)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(rxAdapter)
.build()
}
fun checkUsername(username: String): Observable<UsernameResponse> {
return api.checkUsername(username)
}
}
Run Code Online (Sandbox Code Playgroud)
然后在我的活动中,每当EditText内容发生变化时,我都会进行此调用:
private fun checkUsername(username: String) {
cancelSubscription()
checkUsernameDisposable = ApiService()
.checkUsername(username)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
updateUi(it)
}
}
Run Code Online (Sandbox Code Playgroud)
因此,每次输入变化时,这都会创建一个新的一次性用品.这显然是不正确的.我想要做的是使用新网络调用的结果更新现有订阅.