使RxJava 1.1.5适应Reactor Core 3.1.0.M3

Rya*_*ren 1 java reactive-programming rx-java project-reactor

我想用采用RxJava 1.1.5与Spring WebFlux(即反应堆堆芯3.1.0.M3)库,但我无法适应ObservableFlux

我以为这是相对简单的,但是我的适配器不起作用:

import reactor.core.publisher.Flux;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;

public static <T> Flux<T> toFlux(Observable<T> observable) {
    return Flux.create(emitter -> {
        final Subscription subscription = observable.subscribe(new Subscriber<T>() {
            @Override
            public void onNext(T value) {
                emitter.next(value);
            }
            @Override
            public void onCompleted() {
                emitter.complete();
            }
            @Override
            public void onError(Throwable throwable) {
                emitter.error(throwable);
            }
        });
        emitter.onDispose(subscription::unsubscribe);
    });
}
Run Code Online (Sandbox Code Playgroud)

我已经验证过,onNext并且onCompleted都以正确的顺序被调用,但是我Flux始终是空的。有人看到我在做什么错吗?

在相关说明中,为什么反应堆插件中没有RxJava 1适配器?

aka*_*okd 5

使用RxJavaReactiveStreams适配器将您Observable变成Publisher,然后Flux.fromPublisher()使用它。

compile 'io.reactivex:rxjava-reactive-streams:1.2.1'

Observable<T> o = ...

Flux.from(RxReactiveStreams.toPublisher(o));
Run Code Online (Sandbox Code Playgroud)

在相关说明中,为什么反应堆插件中没有RxJava 1适配器?

他们不想支持或鼓励使用旧技术,我完全同意。