我将开始在我的Android项目中使用反应式编程.我使用Kotlin作为主要语言,现在我想申请Rx.我的第一选择是RxAndroid,但后来我注意到有RxKotlin.
据我所知,两者都是从RxJava分叉的,所以RxAndroid可能为android常见任务提供了一些API.另一方面,RxKotlin支持lambas开箱即用,让我避免将kotlin与java混合.
在这种情况下哪一个是首选的库?
考虑以下用例:
我最终实现了自定义运算符,OperatorDebounceWithTime然后像这样使用它
.lift(new CustomOperatorDebounceWithTime<>(1, TimeUnit.SECONDS, Schedulers.computation()))
Run Code Online (Sandbox Code Playgroud)
CustomOperatorDebounceWithTime立即发送第一个项目,然后使用OperatorDebounceWithTime操作员的逻辑去除后期项目.
是否有更简单的方法来实现所描述的行为?让我们跳过compose运算符,但它没有解决问题.我正在寻找一种方法来实现这一点,而无需实现自定义运算符.
所以我试图创建一个定期发射的可观察物,但由于某些我无法弄清楚的原因,它只发射一次.谁能看到我做错了什么?
Observable<Long> observable = Observable.timer(delay, TimeUnit.SECONDS, Schedulers.io());
subscription = observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
searchByStockHelper.requestRemoteSearchByStock();
}
});
Run Code Online (Sandbox Code Playgroud)
目前延迟设置为2
我只是在学习Rx-java和Rxandroid2,我只是混淆了SubscribeOn和ObserveOn之间的主要区别.
这将每5秒发出一次滴答声.
Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
.subscribe(tick -> Log.d(TAG, "tick = "+tick));
Run Code Online (Sandbox Code Playgroud)
要停止它你可以使用
Schedulers.shutdown();
Run Code Online (Sandbox Code Playgroud)
但随后所有调度程序停止,以后无法恢复计时.我怎样才能停止并恢复"优雅地"发出的声音?
我有一个关于如何取消订阅可观察量的问题.我有两个代码,我不确定哪个更好.
示例1 - >流完成后取消订阅订阅者:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
progressdialog.dissmiss();
unsubscribe();
}
@Override
public void onError(Throwable e) {
progressdialog.dissmiss();
}
@Override
public void onNext(String s) {
// do something with data
}
}
Run Code Online (Sandbox Code Playgroud)
示例2 - >销毁活动后取消订阅订阅:
private void test(){
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
progressdialog.dissmiss();
}
@Override
public void onError(Throwable e) {
progressdialog.dissmiss();
}
@Override
public void onNext(String s) {
// do something with data
}
};
subscription …Run Code Online (Sandbox Code Playgroud) 我正在使用Retrofit 2.0来进行返回Observables的api调用.当呼叫完成并且响应符合预期时,一切正常.现在让我们说我们有一个错误响应,它会抛出一个onError.我想阅读响应正文,即使它是一个错误.
例
@FormUrlEncoded
@POST("tokenLogin")
Observable<LoginResponse> loginWithToken(
@Field("token") String pin
);
Run Code Online (Sandbox Code Playgroud)
当请求和响应有效时,我得到正确的observable,并在出现错误时按预期调用onError.
正确回应:
{ "status" : "authenticated" }
Run Code Online (Sandbox Code Playgroud)
Observable将其转换为正确的Observable,我可以将响应读作LoginResponse对象.
现在,错误响应如下:
{ "errorMessage" : "You need to take some xyz action" }
Run Code Online (Sandbox Code Playgroud)
我想阅读该错误响应并将消息显示给用户.我该怎么做呢?
是否可以使用RxJava/RxAndroid和Retrofit执行定期的http请求,每隔x秒更新一次数据?
目前我正在使用一个每x秒触发的IntentService和Recursive Handler/Runnable.我想知道我是否可以删除所有这些并让RxJava处理请求.
final RestClient client = new RestClient();
final ApiService service = client.getApiService();
public interface ApiService {
@GET("/athletes")
public Observable<List<Athlete>> getAthletes();
}
service.getAthletes()
.retry(3)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<List<Athlete>>() {
@Override
public void call(List<Athlete> athletes) {
// Handle Success
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
// Handle Error
}
});
Run Code Online (Sandbox Code Playgroud)
编辑
完成所有操作后,我最终得到了以下代码.欢迎任何更新.
final Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
obs = Observable.interval(30, TimeUnit.SECONDS, scheduler)
.flatMap(tick -> service.getAthletes(1, 0l))
// Performed on service.getAthletes() observable
.subscribeOn(scheduler)
.observeOn(AndroidSchedulers.mainThread())
.doOnError(err -> …Run Code Online (Sandbox Code Playgroud) 无法弄清楚为什么会这样.我的调用不会触发任何一个rx回调(onCompleted(),onError(),onNext()).我收到的唯一的东西是这个okhttp输出:
D/OkHttp: --> GET https://api.privatbank.ua/p24api/exchange_rates?json=true&date=20.11.2016 http/1.1
D/OkHttp: --> END GET
D/OkHttp: <-- HTTP FAILED: java.io.IOException: Canceled
Run Code Online (Sandbox Code Playgroud)
改造模块:
@Module
public class RestModule {
@Provides
@Singleton
public HttpLoggingInterceptor providesHttpLogginInterceptor() {
return new HttpLoggingInterceptor().setLevel(HttpLoggingInterceptor.Level.BODY);
}
@Provides
@Singleton
public OkHttpClient providesOkHttpClient(@NonNull HttpLoggingInterceptor loggingInterceptor) {
return new OkHttpClient.Builder()
.addInterceptor(loggingInterceptor)
.connectTimeout(ConstantsManager.CONNECTION_TIME_OUT, TimeUnit.SECONDS)
.readTimeout(ConstantsManager.READ_TIME_OUT, TimeUnit.SECONDS)
.build();
}
@Provides
@Singleton
public Gson providesGson() {
return new GsonBuilder().create();
}
@Provides
@Singleton
public Retrofit providesRetrofit(@NonNull OkHttpClient okHttpClient, @NonNull Gson gson) {
return new Retrofit.Builder()
.baseUrl(ConstantsManager.BASE_URL)
.client(okHttpClient)
.addConverterFactory(SimpleXmlConverterFactory.create())
.addConverterFactory(GsonConverterFactory.create(gson))
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build(); …Run Code Online (Sandbox Code Playgroud)