我目前正在尝试使用Java中的反应式扩展来实现特定的结果,但是我无法做到这一点,也许你们中的某人可以帮助我。
firstCompletable
.onErrorComplete(t -> specificErrorHandlingOne())
.andThen(secondCompletable())
.onErrorComplete(t -> specificErrorHandlingTwo())
.andThen(thirdCompletable())
.onErrorComplete(t -> specificErrorHandlingThree())
.andThen(fourthCompletable())
.onErrorComplete(t -> specificErrorHandlingFour())
.subscribe(viewCallback::showSuccess)
Run Code Online (Sandbox Code Playgroud)
但是,例如在secondCompletable中存在错误时,将执行特定的错误处理,但随后仍在计划其他Completable。我希望如果其中一个Completables失败,整个Completables链将停止执行。我该怎么做?
我已经尝试过使用doOnError代替,但这只是在没有抛出特定错误的堆栈跟踪中结束。
想象一下,我有一个服务来检索一些作者(id,名字,姓氏,生日,国籍)和第二个服务来检索作者的书(标题,摘要,页数).我希望我的网络电话会给我一份带有他们书籍的作者名单.
为了说明,我的服务会返回AuthorOResponse和BookResponse,我想发出一个Observable of AuthorObject.
public class AuthorResponse {
private String id;
private String firstname;
private String lastname;
private Date birthday;
private String nationality;
}
public class BookResponse {
private String title;
private String summary;
private int pageNumber;
}
public class AuthorObject {
private String id;
private String firstname;
private String lastname;
private Date birthday;
private String nationality;
private List<BookObject> books
}
Run Code Online (Sandbox Code Playgroud)
我的服务就像是
Observable<List<AuthorResponse>> getAuthors()
Observable<List<BookResponse>> getAuthorBooks(String authorId)
Run Code Online (Sandbox Code Playgroud)
我想用它们来发出一个Observable,但由于我对getAuthorBooks的每次调用都需要一个作者,所以无法弄明白怎么做.
我想出了一些看起来像这样的东西,但我有一个问题,因为我的concatMap让我"松散"了我的作者.
myServices.getAuthors()
.map(author -> createAuthor())
.concatMap(author -> mWebService.getAuthorBooks(author.getId())
.flatMapIterable(data -> data.items)
.map(book …Run Code Online (Sandbox Code Playgroud) 我想知道是否有一种干净的方法来实现一个observable,它可以过滤掉在最近发出的事件之后的时间窗口内发生的任何事件?
我目前有这个:
source.timeInterval(TimeUnit.MILLISECONDS)
.filter(new Predicate<Timed<Object>>() {
final long TIME_LIMIT = 10 * 1000;
long totalTime = 0;
@Override
public boolean test(@NonNull Timed<Object> objectTimed) throws Exception {
totalTime += objectTimed.time();
if (totalTime > TIME_LIMIT) {
totalTime = 0;
return true;
}
return false;
}
})
.subscribe(objectTimed -> {
doSomething(objectTimed)
});
Run Code Online (Sandbox Code Playgroud)
这在技术上可以解决问题,但是在过滤器中需要一些额外的状态,这有点难看并且阻止我使用lambda.相反,我想看看是否有办法组成可以做同样事情的观察者.
ReactiveX不偏向某些特定的并发或异步性源.可以使用线程池,事件循环,非阻塞I/O,演员(例如来自Akka)或任何适合您的需求,风格或专业知识的实现来实现Observable. 客户端代码将其与Observable的所有交互视为异步,无论您的底层实现是阻塞还是非阻塞,然而您选择实现它.
我没有得到这个部分 - " 你的底层实现是阻塞还是非阻塞 ".
你能解释一下吗?或者一些示例代码来解释这个?
如何合并两个BehaviorSubjects,使它们表现得像一个BehaviorSubject?
我有这样的事情:
class Solution {
public static void main(String[] args) {
Subject<List<Integer>> left = BehaviorSubject.createDefault(Arrays.asList(1, 2, 3));
Subject<List<Integer>> right = BehaviorSubject.createDefault(Arrays.asList(4, 5, 6));
Single<List<Integer>> merged = left.mergeWith(right).reduce(new ArrayList<Integer>(), (l, r) -> {
List<Integer> merged1 = new ArrayList<>(l.size() + r.size());
merged1.addAll(l);
merged1.addAll(r);
return merged1;
});
merged.subscribe(System.out::println);
}
}
Run Code Online (Sandbox Code Playgroud)
我希望得到一些东西[1, 2, 3, 4, 5, 6],但subscribe什么都不打印.
我的问题是关于使用RxJava for Android来处理来自Retrofit调用的数据的可能性.
我刚刚开始使用这些库,所以如果我的问题很简单,请耐心等待.
这是我的情景.
我有一个从服务器返回的json,看起来像这样
{ <--- ObjectResponse
higher_level: {
data: [
{
...
some fields,
some inner object: {
....
},
other fields
}, <----- obj 1
....,
{
....
}<---- obj n
]<-- array of SingleObjects
}
} <--- whole ObjectResponse
Run Code Online (Sandbox Code Playgroud)
我已经有改造得到这个响应并在ObjectResponse中解析.解析这个对象,我可以获得一个我可以像往常一样传递给我的RecyclerView适配器的List.
所以改造返回了ObjectResponse,它是整个服务器答案的模型,在改装回调中我操纵ObjectResponse来提取我的List然后传递给我的适配器.
现在,我有这样的事情
Call<ObjectResponse> call = apiInterface.getMyWholeObject();
call.enqueue(new Callback<ObjectResponse>() {
@Override
public void onResponse(Call<ObjectResponse> call, Response<ObjectResponse> response) {
//various manipulation based on response.body() that in the ends
// lead to a List<SingleObject>
mView.sendToAdapter(listSingleObject)
}
@Override
public …Run Code Online (Sandbox Code Playgroud) 我有以下@Dao,提供Flowable<User>流:
@Dao
interface UsersDao {
@Query("SELECT * FROM users")
fun loadUsers(): Flowable<List<User>>
}
Run Code Online (Sandbox Code Playgroud)
我希望流的订户在那里发生一些变化时立即接收数据库的更新.订阅Room Flowable我将获得开箱即用的功能.
我想要的是:如果数据库为空我想执行Web请求并将用户保存到数据库中.订阅者将自动接收刚刚发生的新更新.
现在我希望存储库的客户端不要知道所有的初始化逻辑:他所做的一切 - 他执行usersRepository.loadUsers().所有这些魔法应该发生在存储库类中:
class UsersRepository @Inject constructor(
private val api: Api,
private val db: UsersDao
) {
fun loadUsers(): Flowable<List<User>> {
...
}
}
Run Code Online (Sandbox Code Playgroud)
当然我可以使用以下方法:
fun loadUsers(): Flowable<List<User>> {
return db.loadTables()
.doOnSubscribe {
if (db.getCount() == 0) {
val list = api.getTables().blockingGet()
db.insert(list)
}
}
}
Run Code Online (Sandbox Code Playgroud)
但我想在不使用副作用(doOn...运算符)的情况下构造流.我尝试过,composing()但没有多大帮助.被困在这一段时间了.
在RxJava1中,flatmap有一个重载的方法,允许您保留源值并将其传递到流中。
我从以下博客文章中获得了这些知识
但是,转到RxJava2,我似乎找不到它。我检查了Rx1和Rx2的更改,但未列出。我想知道它是否仍然存在,但也许我找的地方不对。
我正在使用Single顺便说一句。
我有一门课Student,它有两个字段grade和school。这两个字段都需要从远程服务器中获取。当返回两个结果时,我新建了一个Student对象。
在的帮助下RxJava,我以两种方式完成了此操作,一种是flatMapin zip运算符,另一种是in 运算符。
Observable<String> gradeObservable =
Observable.create((ObservableOnSubscribe<String>) emitter -> {
Thread.sleep(1000);
emitter.onNext("senior");
}).subscribeOn(Schedulers.io());
Observable<String> schoolObservable =
Observable.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("MIT");
}).subscribeOn(Schedulers.io());
Run Code Online (Sandbox Code Playgroud)
gradeObservable
.flatMap(grade ->
schoolObservable.map(school -> {
Student student = new Student();
student.grade = grade;
student.school = school;
return student;
}))
.subscribe(student -> {
System.out.println(student.grade);
System.out.println(student.school);
});
Run Code Online (Sandbox Code Playgroud)
Observable.zip(gradeObservable, schoolObservable, (grade, school) -> {
Student student = new Student();
student.grade = grade;
student.school = …Run Code Online (Sandbox Code Playgroud) 可能是一个菜鸟问题.如何为BehaviourSubject设置默认值.
我有一个有两个不同值的枚举
enum class WidgetState {
HIDDEN,
VISIBLE
}
Run Code Online (Sandbox Code Playgroud)
并且是发出状态的行为主体
val widgetStateEmitter: BehaviorSubject<WidgetState> = BehaviorSubject.create()
Run Code Online (Sandbox Code Playgroud)
写入视图逻辑时,我的发射器开始发射.但是默认情况下它是HIDDEN.如何将默认值设置为WidgetState.HIDDEN到我的发射器widgetStateEmitter?
rx-java ×10
android ×7
rx-java2 ×4
reactivex ×3
java ×2
java-8 ×2
kotlin ×2
android-room ×1
asynchronous ×1
retrofit ×1
retrofit2 ×1
rx-android ×1