daz*_*ito 2 java asynchronous reactive-programming observable rx-java
我需要观察一个对象列表,并在列表中有新对象时立即发出事件.像客户端 - 服务器应用程序一样,客户端将内容添加到列表中,服务器将新添加的内容发送到服务器中注册的所有客户端.
到目前为止,我的observable只发出一个项目而不是它之后.换句话说,当一个新对象被添加到列表中时,会有一个事件,但是当添加第二个(或第3个,第4个,第5个......)对象时,没有事件.
一旦列表中有新对象,我需要它继续发出项目.
这是我的代码,MyList让我们说我自己的实现List(与我的问题无关).我希望发出的事件是新添加的对象MyList.
private void onClickMethod() {
MyList myList = populateMyList();
onNexAction = new Action1<MyList>() {
@Override
public void call(MyList myList) {
System.out.println("call()");
TheObject theObject = myList.getNext();
System.out.println("New Event: " + theObject.getData());
}
};
myObservable = Observable.create(new Observable.OnSubscribe<MyList>() {
@Override
public void call(Subscriber<? super MyList> t) {
t.onNext(myList);
}
});
myObservable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()).subscribe(onNexAction);
}
Run Code Online (Sandbox Code Playgroud)
上面的代码只会发出第一个添加的对象.但是如果我在call()中放置一个while循环,它将发出所有事件,就像我想要的那样.但我觉得不适合做我想做的事.我觉得我更像是轮询而不是异步.
这是完全相同的代码,但有while循环:
private void onClickMethod() {
MyList myList = populateMyList();
onNexAction = new Action1<MyList>() {
@Override
public void call(MyList myList) {
System.out.println("call()");
while (myList.size() > 0) { // The while cycle
TheObject theObject = myList.getNext();
System.out.println("New Event: " + theObject.getData());
}
}
};
myObservable = Observable.create(new Observable.OnSubscribe<MyList>() {
@Override
public void call(Subscriber<? super MyList> t) {
t.onNext(myList);
}
});
myObservable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()).subscribe(onNexAction);
}
Run Code Online (Sandbox Code Playgroud)
那么,我在这里做错了什么?一旦MyList中有新对象,我怎样才能让我的observable发出新事件?
编辑:
正如Tassos Bassoukos所建议的那样,我应用了distinct()这样的运算符:
myObservable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()).distinct().subscribe(onNexAction);
Run Code Online (Sandbox Code Playgroud)
但它并没有解决我的问题.仅发出第一个添加的对象.
这是一个典型的案例PublishSubject:
class MyList<T> {
final PublishSubject<T> onAdded = PublishSubject.create();
void add(T value) {
// add internally
onAdded.onNext(value);
}
Observable<T> onAdded() {
return onAdded;
}
}
MyList<Integer> list = populate();
Subscription s = list.onAdded()
.subscribe(v -> System.out.println("Added: " + v));
list.add(1); // prints "Added: 1"
list.add(2); // prints "Added: 2"
list.add(3); // prints "Added: 3"
s.unsubscribe(); // not interested anymore
list.add(4); // doesn't print anything
list.add(5); // doesn't print anything
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
554 次 |
| 最近记录: |