Rx Java 2:如何包装回调?

Mik*_*679 7 java android callback rx-java2

我有这个代码在Rx Java 1中包装一个回调并且它编译得很好,但是现在我已经切换到RX Java 2它不能编译...... Rx Java 2中的等价物是什么?

return Observable.fromEmitter(new Action1<AsyncEmitter<Integer>>() {
            @Override
            public void call(AsyncEmitter<Integer> emitter) {

                transObs.setTransferListener(new TransferListener() {
                    @Override
                    public void onStateChanged(int id, TransferState state) {
                        if (state == TransferState.COMPLETED)
                            emitter.onCompleted();
                    }

                    @Override
                    public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {

                    }

                    @Override
                    public void onError(int id, Exception ex) {
                        emitter.onError(ex);
                    }
                });

                emitter.setCancellation(new AsyncEmitter.Cancellable() {
                    @Override
                    public void cancel() throws Exception {

                        transObs.cleanTransferListener();
                    }
                });
            }
        }, AsyncEmitter.BackpressureMode.BUFFER);
Run Code Online (Sandbox Code Playgroud)

更新:

我想出了这个,但你是否需要处理背压,因为它是一个oncreate电话?

 return Observable.create(new ObservableOnSubscribe<List<DigitsUser>>() {

        @Override
        public void subscribe(final ObservableEmitter<List<DigitsUser>> emitter) throws Exception {

            mDigitFriends.findFriends((gotEm, users) -> {
                emitter.onNext(users);
            });

            emitter.setCancellable(() -> {
                emitter.onNext(null);
            });
        }
    });
Run Code Online (Sandbox Code Playgroud)

ajp*_*e33 6

如果你担心背压,你应该使用Flowable类.以下是RxJava2 Wiki的引用:

实际上,1.x fromEmitter(以前来自async)已经重命名为Flowable.create.

以下是使用Flowable类的示例:

 return Flowable.create(new FlowableEmitter<List<DigitsUser>>() {

        @Override
        public void subscribe(final FlowableEmitter<List<DigitsUser>> emitter) throws Exception {

            mDigitFriends.findFriends((gotEm, users) -> {
                emitter.onNext(users);
            });

            emitter.setCancellable(() -> {
                emitter.onNext(null);
            });
        }
    }, BackpressureStrategy.BUFFER);
Run Code Online (Sandbox Code Playgroud)

  • 在 RxJava 2 中不允许发出 null,这段代码只会崩溃 (2认同)