RxJava:将 Rx Flowable 拆分为多个流

Mic*_*zyk 1 java stream rx-java

我想对流执行一些操作,然后将流分成两个流,然后分别处理它们。

显示问题的示例:

Flowable<SuccessfulObject> stream = Flowable.fromArray(
        new SuccessfulObject(true, 0),
        new SuccessfulObject(false, 1),
        new SuccessfulObject(true, 2));

stream = stream.doOnEach(System.out::println);

Flowable<SuccessfulObject> successful = stream.filter(SuccessfulObject::isSuccess);
Flowable<SuccessfulObject> failed = stream.filter(SuccessfulObject::isFail);

successful.doOnEach(successfulObject -> {/*handle success*/}).subscribe();
failed.doOnEach(successfulObject -> {/*handle fail*/}).subscribe();
Run Code Online (Sandbox Code Playgroud)

班级:

class SuccessfulObject {
    private boolean success;
    private int id;

    public SuccessfulObject(boolean success, int id) {
        this.success = success;
        this.id = id;
    }

    public boolean isSuccess() {
        return success;
    }
    public boolean isFail() {
        return !success;
    }

    public void setSuccess(boolean success) {
        this.success = success;
    }

    @Override
    public String toString() {
        return "SuccessfulObject{" +
                "id=" + id +
                '}';
    }
}
Run Code Online (Sandbox Code Playgroud)

但是这段代码将所有元素打印两次,而我想在仅拆分一次之前执行所有操作。

输出:

OnNextNotification[SuccessfulObject{id=0}]
OnNextNotification[SuccessfulObject{id=1}]
OnNextNotification[SuccessfulObject{id=2}]
OnCompleteNotification
OnNextNotification[SuccessfulObject{id=0}]
OnNextNotification[SuccessfulObject{id=1}]
OnNextNotification[SuccessfulObject{ id=2}]
OnCompleteNotification

我如何处理流以接收此行为?

aka*_*okd 5

用于publish共享源订阅:

Flowable<Integer> source = Flowable.range(1, 5);

ConnectableFlowable<Integer> cf = source.publish();

cf.filter(v -> v % 2 == 0).subscribe(v -> System.out.println("Even: " + v));

cf.filter(v -> v % 2 != 0).subscribe(v -> System.out.println("Odd: " + v));

cf.connect();
Run Code Online (Sandbox Code Playgroud)