RxJava + Websocket - 如何将 Observable 添加到 Websocket 侦听器?

DIR*_*AVE 5 android rx-java okhttp android-mvvm rx-java2

我有一个ViewModel正在观察ObservableMainRepo班级中的 RxJava 。我试图让我WebsocketListenerMainRepo课堂上发出事件,但我不确定如何这样做。

MainRepo 类:

private WebSocket ws;

public void createWsConnection() {
        Request request = new Request.Builder()
                .url(Constants.WEBSOCKET_ENDPOINT)
                .addHeader(Constants.WEBSOCKET_HEADERS_KEY, Constants.USER_ID)
                .build();

        OkHttpClient client = new OkHttpClient
                .Builder()
                .pingInterval(30, TimeUnit.SECONDS)
                .build();

        this.ws = client.newWebSocket(request, webSocketListener);
    }
Run Code Online (Sandbox Code Playgroud)

这是我感到困惑的地方。我不知道如何将 websocket 与 RxJava observable 一起使用。

public Observable<String> createListener(){
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) {
                 //I don't know what to put here in order to emit messages
                 //back to my ViewModel class using the websocket listener
            }
        });
    }
Run Code Online (Sandbox Code Playgroud)

websocket 侦听器:

 private WebSocketListener webSocketListener = new WebSocketListener() {

        @Override
        public void onOpen(@NotNull WebSocket webSocket, Response response) {
            Timber.d("Ws connection opened...", response.toString());
        }

        @Override
        public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            Timber.d("Ws connection closing...");
        }

        @Override
        public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            Timber.d("Ws connection closed...");
        }

        @Override
        public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
            Timber.d("Ws incoming message.");

        }

        @Override
        public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, Response response) {
            Timber.e(t, "Ws connection failure.", response.toString());

        }
    };
Run Code Online (Sandbox Code Playgroud)

ViewModel 类中的一个函数正在观察我的 MainRepo 类中的 Observable:

public void connectToWs(){
        mainRepo.createListener()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Timber.d("Subscribed");
            }

            @Override
            public void onNext(@NonNull String s) {
                Timber.d("Message: " + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Timber.e(e, "Something went wrong.");
            }

            @Override
            public void onComplete() {
                Timber.d("On complete.");
            }
        });
    }
Run Code Online (Sandbox Code Playgroud)

Gus*_*avo 2

创建一个PublishSubject并更改您的createListener方法以返回它:

private PublishSubject<String> publishSubject = PublishSubject.create<String>();

public Observable<String> createListener(){
    return publishSubject;
}
Run Code Online (Sandbox Code Playgroud)

PublishSubject 是一个 Observable,因此请注意,您不需要更改方法签名,但我建议您将方法名称重命名为observeMessages.

然后在您的 websocket 侦听器中,您可以使用onNext方法将消息发送到 PublishSubject 。您还应该在 onClosed 方法中调用onComplete ,在 onFailure 方法中调用 onError :

 private WebSocketListener webSocketListener = new WebSocketListener() {

        @Override
        public void onOpen(@NotNull WebSocket webSocket, Response response) {
            Timber.d("Ws connection opened...", response.toString());
        }

        @Override
        public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            Timber.d("Ws connection closing...");
        }

        @Override
        public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            Timber.d("Ws connection closed...");

            publishSubject.onComplete();
        }

        @Override
        public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
            Timber.d("Ws incoming message.");

            publishSubject.onNext(text);
        }

        @Override
        public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, Response response) {
            Timber.e(t, "Ws connection failure.", response.toString());

            publishSubject.onError(t);
        }
    };
Run Code Online (Sandbox Code Playgroud)