我在这里有点疯狂.我正在尝试创建一个Observable<BigDecimal>扩展函数(针对RxJava 2.x)来发出平均排放量,但是我得到了Single.zip()函数的编译错误.有没有人有任何想法我做错了什么?我试图明确我的所有类型,但是没有用......
import io.reactivex.Observable
import io.reactivex.Single
import java.math.BigDecimal
fun Observable<BigDecimal>.sum() = reduce { total, next -> total + next }
//compile error
fun Observable<BigDecimal>.average() = publish().autoConnect(2).let {
Single.zip(it.sum().toSingle(), it.count()) {
sum, count -> sum / BigDecimal.valueOf(count)
}
}
Run Code Online (Sandbox Code Playgroud)
Observable从传统的Java事件模式创建Rx-Java的最佳方法是什么?也就是说,给定
class FooEvent { ... }
interface FooListener {
void fooHappened(FooEvent arg);
}
class Bar {
public void addFooListener(FooListener l);
public void removeFooListener(FooListener l);
}
Run Code Online (Sandbox Code Playgroud)
我想实施
Observable<FooEvent> fooEvents(Bar bar);
Run Code Online (Sandbox Code Playgroud)
我想出的实现是:
Observable<FooEvent> fooEvents(Bar bar) {
return Observable.create(new OnSubscribeFunc<FooEvent>() {
public Subscription onSubscribe(Observer<? super FooEvent> obs) {
FooListener l = new FooListener() {
public void fooHappened(FooEvent arg) {
obs.onNext(arg);
}
};
bar.addFooListener(l);
return new Subscription() {
public void unsubscribe() {
bar.removeFooListener(l);
}
};
}
});
}
Run Code Online (Sandbox Code Playgroud)
但是,我真的不喜欢它:
它非常冗长;
需要一个监听器Observer(理想情况下,如果没有观察者,则应该没有监听器,否则一个监听器).这可以通过将观察者计数保持为一个字段, …
我刚开始使用rxjava而且卡住了.也许我没有以正确的方式使用rxjava,但我需要Observable在创建后添加项目.所以我理解你可以只是打电话Observable.just("Some", "Items"),订阅者会收到它们,但是如果我有一个异步任务,我需要在任务完成后再添加一些项目呢?我找不到类似的东西Observable.addItems("Some", "More", "Items")
我想在另一个线程(如IO线程)中使用okhttp请求一个url并进入ResponseAndroid主线程,但我不知道如何创建一个Observable.
谁能解释我为什么这样的代码:
networApi.getList()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.doOnError(throwable -> {
throwable.getMessage();
})
.doOnNext(list -> {
coursesView.populateRecyclerView(list);
courseList = (List<Course>) courses;
}).subscribe();
Run Code Online (Sandbox Code Playgroud)
如果没有互联网进入doOnError但是进一步抛出它以便应用程序关闭,但代码如下:
networkApi.getList()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<List<? extends Course>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
e.getMessage();
}
@Override
public void onNext(List<? extends Course> list) {
coursesView.populateRecyclerView(list);
courseList = (List<Course>) list;
}
});
Run Code Online (Sandbox Code Playgroud)
按照我的预期工作,这意味着当没有互联网连接时,它什么都不做.
我必须定期轮询一些RESTful端点以刷新我的Android应用程序的数据.我还必须根据连接暂停和恢复它(如果手机处于离线状态,则无需尝试).我目前的解决方案是有效的,但它使用标准的Java ScheduledExecutorService来执行周期性任务,但我想留在Rx范例中.
这是我当前的代码,为简洁起见,部分内容被跳过.
userProfileObservable = Observable.create(new Observable.OnSubscribe<UserProfile>() {
@Override
public void call(final Subscriber<? super UserProfile> subscriber) {
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
final Runnable runnable = new Runnable() {
@Override
public void run() {
// making http request here
}
};
final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>(1);
networkStatusObservable.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean networkAvailable) {
if (!networkAvailable) {
pause();
} else {
pause();
futures.add(scheduledExecutorService.scheduleWithFixedDelay(runnable, 0, SECOND_IN_MILLIS * SECONDS_TO_EXPIRE, TimeUnit.MILLISECONDS));
}
}
private void pause() {
for (ScheduledFuture<?> future …Run Code Online (Sandbox Code Playgroud) 我还是RxJava的新手,我在Android应用程序中使用它.我已经阅读了关于这个主题的公吨,但仍然觉得我错过了一些东西.
我有以下场景:
我有数据存储在系统中,可通过各种服务连接(AIDL)访问,我需要从该系统中检索数据(可能会发生1-n次异步调用).Rx帮助我简化了这段代码.但是,整个过程往往需要几秒钟(超过5秒+),因此我需要缓存此数据以加速本机应用程序.
此时的要求是:
初始订阅时,缓存将为空,因此我们必须等待所需的加载时间.没什么大不了.之后,应该缓存数据.
后续加载应该从缓存中提取数据,但是应该重新加载数据并且磁盘缓存应该在幕后.
问题:我有两个Observable - A和B. A包含从本地服务中提取数据的嵌套Observable(这里有吨).B更简单.B只包含从磁盘缓存中提取数据的代码.
需要解决:a)返回缓存项(如果已缓存)并继续重新加载磁盘缓存.b)缓存为空,从系统加载数据,缓存并返回.后续调用将返回"a".
我有几个人推荐一些操作,如flatmap,merge甚至主题,但由于某种原因,我无法连接点.
我怎样才能做到这一点?
我正在使用RxJava和RxAndroid与Retrofit2.
Observable<ResponseOne> responseOneObservable = getRetrofitClient().getDataOne()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Observable<ResponseTwo> responseTwoObservable = getRetrofitClient().getDataTwo()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Run Code Online (Sandbox Code Playgroud)
在上面两个Observer上使用如下的zip操作符.
Observable<ArrayList<TestData>> testDataObservable = Observable.zip(responseOneObservable, responseTwoObservable, new Func2<ResponseOne, ResponseTwo, ArrayList<TestData>>() {
@Override
public ArrayList<TestData> call(ResponseOne responseOne, ResponseTwo responseTwo) {
ArrayList<TestData> testDataList = new ArrayList();
// Add test data from response responseOne & responseTwo
return testDataList;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<ArrayList<TestData>>() {
@Override
public void onNext(ArrayList<TestData> testDataList) {
}
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted" );
}
@Override
public void onError(Throwable t) {
Log.d(TAG, "onError Throwable: …Run Code Online (Sandbox Code Playgroud) 当使用rxjava 1.xi用于返回Observable<Void>处理来自改造的空响应时:
@POST( "login" )
Observable<Void> getToken( @Header( "Authorization" ) String authorization,
@Header( "username" ) String username,
@Header( "password" ) String password );
Run Code Online (Sandbox Code Playgroud)
但是因为rxjava 2.x不会发出任何东西,Void是否有任何好的做法来处理那些空的响应?
为什么我们需要将RxAndroid与RxJava一起使用?它们与RxAndroid和RxJava的实际使用之间有什么功能差异?我找不到合适的答案.
rx-java ×10
android ×7
rx-android ×4
java ×2
aidl ×1
asynchronous ×1
caching ×1
events ×1
kotlin ×1
observable ×1
okhttp ×1
retrofit ×1
retrofit2 ×1
rx-java2 ×1
subscriber ×1