当我在FlowableOnSubscribe类中调用onNext时,我的订阅者的onNext和onComplete函数不会运行

Eug*_*ene 5 java android rx-java rx-android

在使用RxJava 2的Android项目中,我FlowableonCreate我的初始活动中创建了这样的:

Flowable.create(new MyFlowableOnSubscribe1(), BackpressureStrategy.BUFFER)
        .doOnComplete(new MyAction())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new MySubscriber());
Run Code Online (Sandbox Code Playgroud)

FlowableOnSubscribe的实现是:

public class MyFlowableOnSubscribe1 implements FlowableOnSubscribe<String> {
    public static final String TAG = "XX MyFlOnSub1";

    @Override
    public void subscribe(FlowableEmitter<String> emitter) {
        Log.i(TAG, "subscribe");

        emitter.onNext("hello");
        emitter.onComplete();
    }
}
Run Code Online (Sandbox Code Playgroud)

这是订户实施:

public class MySubscriber implements Subscriber<String> {
    public static final String TAG = "XX MySubscriber";

    @Override
    public void onSubscribe(Subscription s) {
        Log.i(TAG, "onSubscribe");
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: " + s);
    }
}
Run Code Online (Sandbox Code Playgroud)

行动实施是:

public class MyAction implements Action {
    public static final String TAG = "XX MyAction";

    @Override
    public void run() {
        Log.i(TAG, "run");
    }
}
Run Code Online (Sandbox Code Playgroud)

在我的输出中,我期待一个日志声明onNext,但我没有看到.相反,这是我的整个输出:

02-23 17:56:31.334 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run
Run Code Online (Sandbox Code Playgroud)

这表明onNext从不运行,onComplete甚至不运行.但MyAction运行成功.

这是当我发出以下呼叫的评论时发生的事情onNext:

02-23 17:58:31.572 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run
02-23 17:58:31.652 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onComplete
Run Code Online (Sandbox Code Playgroud)

在这种情况下onNext,当然不会运行,但至少onComplete运行.

我希望我能onComplete在两种情况下看到运行,并onNext在我打电话时运行emitter.onNext.我在这做错了什么?

aka*_*okd 16

您需要手动发出请求,否则Subscriber直接扩展时不会发出任何数据:

@Override
public void onSubscribe(Subscription s) {
    Log.i(TAG, "onSubscribe");
    s.request(Long.MAX_VALUE);
}
Run Code Online (Sandbox Code Playgroud)

或者,您可以延长DisposableSubscriberResourceSubscriber.