假设我有两个可观测量:A和B发射物品:A:1,2,3,4,5 B:2,4,6
有没有办法通过删除也是第二序列的项目来过滤第一个序列?
[edit]所需的数据流:从可观察的B(bList)加载所有项目,然后从A加载项目,根据标准过滤它们:!bList.contains(item)
我想用rxAndroid重写项目.在我的代码中,我有这个简单的部分:
public void chkLogin(String login, String password) {
String[] myTaskParams = { login, password };
new ChkLoginTask().execute(myTaskParams);
}
private class ChkLoginTask extends AsyncTask<String, Void, LoginStatus> {
@Override
protected void onPreExecute(){
view.showLoadingSpinner();
}
@Override
protected LoginStatus doInBackground(String... params) {
return app.getAPI().checkLogin(params[0], params[1]);
}
@Override
protected void onPostExecute(LoginStatus result) {
view.hideLoadingSpinner();
if(result == null)
view.onConnectionError();
else{
view.onChkLoginSuccess(result);
}
}
}
Run Code Online (Sandbox Code Playgroud)
在这段代码中,我只用两个参数在AsyncTask中调用我的api方法,然后根据它的结果做出反应.
现在我想用rxAndroid编写这段代码,但我坚持下去......我知道rx如何处理对UI元素的反应,但无法找到有关它如何与参数化AsyncTask一起工作的信息
在这种情况下使用RX是否正确?
我想结合2个observable并在AndroidSchedules.mainThread()上执行"combine function".我添加了.observeOn(AndroidSchedules.mainThread()),但我仍然得到"java.lang.IllegalStateException:不在主线程上".
Observable<List<Post>> animateCameraAndGetPostsByProjection = Observable.combineLatest(
mapObservableProvider.getMapReadyObservable(),
LocationService.getUpdatedOrLastKnownLocation(this),
(googleMap, location) -> {
CameraUpdate cameraUpdate = CameraUpdateFactory.newLatLngZoom(new LatLng(location.getLatitude(),location.getLongitude()),15);
googleMap.moveCamera(cameraUpdate);
return googleMap.getProjection().getVisibleRegion();
})
.flatMap(vr -> new RestService().getApi().getPostsByMapProjection(vr.farLeft.latitude,vr.farLeft.longitude,vr.nearRight.latitude,vr.nearRight.longitude))
.observeOn(AndroidSchedulers.mainThread());
Run Code Online (Sandbox Code Playgroud) 我想在执行 Observable 时显示 ProgressBar:
Observable<T> observable;
Observer<T> observer;
...................
observable.doOnSubscribe(()->{showProgressBar();}
.finallyDo(()-> {hideProgressBar();})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
...................
protected void showProgressBar() {
if (mProgressBar != null)
mProgressBar.setVisibility(View.VISIBLE);
}
protected void hideProgressBar() {
if (mProgressBar != null)
mProgressBar.setVisibility(View.GONE);
}
Run Code Online (Sandbox Code Playgroud)
;
但我收到此错误:
android.view.ViewRootImpl$CalledFromWrongThreadException: Only the original thread that created a view hierarchy can touch its views.
Run Code Online (Sandbox Code Playgroud)
在线的 mProgressBar.setVisibility(View.VISIBLE);
如何运行showProgressBar()的doOnSubscribe()?
我正在尝试创建一个Observable这样的,它将在一个时间间隔内从网络加载一些数据,并在用户刷新页面时.这是我到目前为止的要点:
PublishSubject<Long> refreshSubject = PublishSubject.create();
Observable<MyDataType> observable = Observable.merge(
Observable.interval(0, 3, TimeUnit.SECONDS),
refreshSubject
)
.flatMap(t -> {
// network operations that eventually return a value
// these operations are not observables themselves
// they are fully blocking network operations
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
// update ui with data
}, error -> {
// do something with error
});
Run Code Online (Sandbox Code Playgroud)
后来在刷新回调中我有:
refreshSubject.onNext(0L);
Run Code Online (Sandbox Code Playgroud)
它在间隔很好的情况下运行,但是当我刷新时,它会爆炸NetworkOnMainThreadException.我以为我用subscribeOn/ 处理了这个observeOn.我错过了什么?另外,为什么当Observer从间隔触发时这不会导致崩溃?
我创建了一个简单IntentService的文件和一些数据上传到服务器。我希望能够Toast在上传完成时显示,但是我需要在主线程上才能执行此操作。
由于我将RetroFit与RxJava结合使用来处理实际请求,因此我认为我应该使用该observeOn(AndroidSchedulers.mainThread())方法Toast在主线程上创建。问题是(由于服务器的原因)我可能不得不重新发送请求,在这种情况下,我必须postRequest()再次调用该方法。
然后,此新请求现在位于主线程上。因此,为了避免使用该subscribeOn(Schedulers.io())方法,考虑到Service已经在自己的线程上,这似乎是一种浪费。
是否有指定的方式Observable应该subscribeOn()的Service线程?还是应该仅继承子类Service而不IntentService使用io线程?
private void postRequest(String title, LatLng location,
String description, MultipartBody.Part attachment) {
mNetworkService.postRequest(title, location.latitude, location.longitude,
description, attachment).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(response -> {
if (response.status() == 1) {
ToastUtil.makeText(getApplicationContext(), "Request Sent");
stopSelf();
} else {
postRequest(title, location, description, attachment);
}
});
}
Run Code Online (Sandbox Code Playgroud) 我想在用户点击按钮时调用请求.但是当第一次请求没有响应时,用户想要更改参数并使用相同的URL调用新请求.
问题 如何取消第一个请求并使用相同的URL调用新请求.
ServiceFactory
.createService()
.getSomething(page)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DefaultObserver<SomthingResponse>() {
@Override
public void onNext(@NonNull SomthingResponse response) {
showSomthing()
hideProgressDialog();
}
@Override
public void onError(@NonNull Throwable e) {
hideProgressDialog();
showErrorMessage(e.getMessage());
}
@Override
public void onComplete() {
}
});
Run Code Online (Sandbox Code Playgroud)
当用户点击按钮时,用户会将页面参数发送到查询字符串,但此请求尚未完成.用户将更改为页面参数并发送新请求.我想取消第一个请求.
谢谢
有没有办法在运行时更改Observable.interval周期?有没有一种方法可以停止和恢复Observable.interval节拍?有没有办法重置间隔时间?
实际上,我正在使用以下代码在一段时间内永远执行某项操作,但是在运行期间我无法对其进行控制,因此必须在运行时停止,恢复,休息和更改周期。
Observable.interval(8, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("TAG", "onSubscribe");
}
@Override
public void onNext(Long aLong) {
myMethod();
}
@Override
public void onError(Throwable e) {
Log.i("TAG", "onError");
}
@Override
public void onComplete() {
Log.i("TAG", "onComplete");
}
});
Run Code Online (Sandbox Code Playgroud)
我尝试用Google搜索它以找到一种解决方案,但不幸的是我没有找到任何解决方案,如果有的话,我需要帮助或资源。
我使用RxTextView.textChanges的EditText,当用户键入的变化值EditText,以数字转换为货币格式如下图所示:
1,000
Run Code Online (Sandbox Code Playgroud)
但是我看不到任何将数字转换为货币格式的数字。
我正在使用来自: NumberFormat.getNumberInstance(Locale.US).format(productPrice);
我的代码就像下面这样:
Observable<CharSequence> observableDiscountPrice = RxTextView.textChanges(discountPriceEdittext);
observableDiscountPrice.map(new Function<CharSequence, Boolean>() {
@Override
public Boolean apply(@io.reactivex.annotations.NonNull CharSequence charSequence) throws Exception {
try {
if (charSequence.length() > 0) {
String pPrice = NumberFormat.getNumberInstance(Locale.US).format(charSequence.toString());
originalPriceEdittext.setText(String.valueOf(pPrice));
return true;
} else {
return false;
}
} catch (Exception e) {
return true;
}
}
}).subscribe(new Subject<Boolean>() {
@Override
public boolean hasObservers() {
return false;
}
@Override
public boolean hasThrowable() {
return false; …Run Code Online (Sandbox Code Playgroud) 我正在尝试在 kotlin 中创建 Observable 。但是它在 OnSubscribe 方法上给出了错误未解决的引用
fun getDisposableObserver(): Observable<Background> {
return Observable.create(object :Observable.OnSubscribe<Background> ->{})
}
Run Code Online (Sandbox Code Playgroud)
我试过这个片段。它也不起作用
Observable.create(object : ObservableOn.OnSubscribe<Int> {
override fun call(subscriber: Subscriber<in Int>) {
for(i in 1 .. 5)
subscriber.onNext(i)
subscriber.onCompleted()
}
})
Run Code Online (Sandbox Code Playgroud)
我做错了什么?,如何创建 Observable?
在http://reactivex.io/RxJava/javadoc/io/reactivex/Observable中指出:
默认情况下,Observable 的运算符以 128 个元素的缓冲区大小运行(参见 Flowable.bufferSize(),可以通过系统参数 rx2.buffer-size 全局覆盖。
我的问题是,我如何访问和设置rx2.buffer-size?如果我做:
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
import io.reactivex.*;
Run Code Online (Sandbox Code Playgroud)
...其次是:
Integer bufferSize = rx2.buffer-size;
Run Code Online (Sandbox Code Playgroud)
Studio 通知我:
import io.reactivex.*;) 如何覆盖系统参数rx2.buffer-size?
我的项目正在使用
io.reactivex.rxjava2/rxandroid/2.0.1
io.reactivex.rxjava2/rxjava/2.1.11
Run Code Online (Sandbox Code Playgroud) 我目前正在学习RxJava/RxAndroid的基础知识,并尝试创建一个非常基本的Retrofit测试应用程序.
我在这个例子中使用RxJava 1.2.6.
但是,我一直没能找到如何在这是一个列表的对象执行功能的任何明显的例子中可观察的对象.
例如,如果我有以下POJO
public class AgencyResponse {
private List<Agency> agencies;
private int total;
private int count;
private int offset;
public List<Agency> getAgencies() {
return agencies;
}
public int getTotal() {
return total;
}
public int getCount() {
return count;
}
public int getOffset() {
return offset;
}
}
Run Code Online (Sandbox Code Playgroud)
以及
public class Agency {
private int id;
private String name;
private String abbrev;
public int getId() {
return id;
}
public String getName() {
return name;
} …Run Code Online (Sandbox Code Playgroud) 之前我在java中使用了模型类autovalue.现在,它被转换为Kotlin数据类.
型号类 - >
public static SampleClass create(
@NonNull final SamplePost post,
@NonNull final List<SampleComment> comments) {
return new AutoValue_SampleClass(post, comments);
}
Run Code Online (Sandbox Code Playgroud)
调用类 - >
return Observable.zip(...
SampleClass::create);
}
Run Code Online (Sandbox Code Playgroud)
新数据类 - >
data class SampleClass(val post: DiscussionPost,
val comments: List<SampleComment>) : Parcelable
Run Code Online (Sandbox Code Playgroud)
现在如何为数据类调用它?