She*_*har 6 java reactive-programming system.reactive rx-java
我正在尝试使用RxJava编写一个简单的程序来生成无限的自然数序列.所以,到目前为止,我已经找到了两种使用Observable.timer()和Observable.interval()生成数字序列的方法.我不确定这些功能是否是解决此问题的正确方法.我期待像Java 8中那样的简单函数来生成无限的自然数.
IntStream.iterate(1,value - > value +1).forEach(System.out :: println);
我尝试使用带有Observable的IntStream,但这不能正常工作.它只向第一个用户发送无限的数字流.如何正确生成无限自然数序列?
import rx.Observable;
import rx.functions.Action1;
import java.util.stream.IntStream;
public class NaturalNumbers {
public static void main(String[] args) {
Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> {
IntStream stream = IntStream.iterate(1, val -> val + 1);
stream.forEach(naturalNumber -> subscriber.onNext(naturalNumber));
});
Action1<Integer> first = naturalNumber -> System.out.println("First got " + naturalNumber);
Action1<Integer> second = naturalNumber -> System.out.println("Second got " + naturalNumber);
Action1<Integer> third = naturalNumber -> System.out.println("Third got " + naturalNumber);
naturalNumbers.subscribe(first);
naturalNumbers.subscribe(second);
naturalNumbers.subscribe(third);
}
}
Run Code Online (Sandbox Code Playgroud)
问题在于naturalNumbers.subscribe(first);,OnSubscribe您实现的 on 正在被调用,并且您正在forEach无限流上执行操作,因此您的程序永远不会终止。
处理它的一种方法是在不同的线程上异步订阅它们。为了轻松查看结果,我必须在流处理中引入睡眠:
Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> {
IntStream stream = IntStream.iterate(1, i -> i + 1);
stream.peek(i -> {
try {
// Added to visibly see printing
Thread.sleep(50);
} catch (InterruptedException e) {
}
}).forEach(subscriber::onNext);
});
final Subscription subscribe1 = naturalNumbers
.subscribeOn(Schedulers.newThread())
.subscribe(first);
final Subscription subscribe2 = naturalNumbers
.subscribeOn(Schedulers.newThread())
.subscribe(second);
final Subscription subscribe3 = naturalNumbers
.subscribeOn(Schedulers.newThread())
.subscribe(third);
Thread.sleep(1000);
System.out.println("Unsubscribing");
subscribe1.unsubscribe();
subscribe2.unsubscribe();
subscribe3.unsubscribe();
Thread.sleep(1000);
System.out.println("Stopping");
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2958 次 |
| 最近记录: |