使用RxJava生成无限序列的自然数

She*_*har 6 java reactive-programming system.reactive rx-java

我正在尝试使用RxJava编写一个简单的程序来生成无限的自然数序列.所以,到目前为止,我已经找到了两种使用Observable.timer()Observable.interval()生成数字序列的方法.我不确定这些功能是否是解决此问题的正确方法.我期待像Java 8中那样的简单函数来生成无限的自然数.

IntStream.iterate(1,value - > value +1).forEach(System.out :: println);

我尝试使用带有Observable的IntStream,但这不能正常工作.它只向第一个用户发送无限的数字流.如何正确生成无限自然数序列?

import rx.Observable;
import rx.functions.Action1;

import java.util.stream.IntStream;

public class NaturalNumbers {

    public static void main(String[] args) {
        Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> {
            IntStream stream = IntStream.iterate(1, val -> val + 1);
            stream.forEach(naturalNumber -> subscriber.onNext(naturalNumber));
        });

        Action1<Integer> first = naturalNumber -> System.out.println("First got " + naturalNumber);
        Action1<Integer> second = naturalNumber -> System.out.println("Second got " + naturalNumber);
        Action1<Integer> third = naturalNumber -> System.out.println("Third got " + naturalNumber);
        naturalNumbers.subscribe(first);
        naturalNumbers.subscribe(second);
        naturalNumbers.subscribe(third);

    }
}
Run Code Online (Sandbox Code Playgroud)

mko*_*bit 3

问题在于naturalNumbers.subscribe(first);OnSubscribe您实现的 on 正在被调用,并且您正在forEach无限流上执行操作,因此您的程序永远不会终止。

处理它的一种方法是在不同的线程上异步订阅它们。为了轻松查看结果,我必须在流处理中引入睡眠:

Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> {
    IntStream stream = IntStream.iterate(1, i -> i + 1);
    stream.peek(i -> {
        try {
            // Added to visibly see printing
            Thread.sleep(50);
        } catch (InterruptedException e) {
        }
    }).forEach(subscriber::onNext);
});

final Subscription subscribe1 = naturalNumbers
    .subscribeOn(Schedulers.newThread())
    .subscribe(first);
final Subscription subscribe2 = naturalNumbers
    .subscribeOn(Schedulers.newThread())
    .subscribe(second);
final Subscription subscribe3 = naturalNumbers
    .subscribeOn(Schedulers.newThread())
    .subscribe(third);

Thread.sleep(1000);

System.out.println("Unsubscribing");
subscribe1.unsubscribe();
subscribe2.unsubscribe();
subscribe3.unsubscribe();
Thread.sleep(1000);
System.out.println("Stopping");
Run Code Online (Sandbox Code Playgroud)