DIR*_*AVE 5 android rx-java okhttp android-mvvm rx-java2
我有一个ViewModel正在观察Observable我MainRepo班级中的 RxJava 。我试图让我WebsocketListener在MainRepo课堂上发出事件,但我不确定如何这样做。
MainRepo 类:
private WebSocket ws;
public void createWsConnection() {
Request request = new Request.Builder()
.url(Constants.WEBSOCKET_ENDPOINT)
.addHeader(Constants.WEBSOCKET_HEADERS_KEY, Constants.USER_ID)
.build();
OkHttpClient client = new OkHttpClient
.Builder()
.pingInterval(30, TimeUnit.SECONDS)
.build();
this.ws = client.newWebSocket(request, webSocketListener);
}
Run Code Online (Sandbox Code Playgroud)
这是我感到困惑的地方。我不知道如何将 websocket 与 RxJava observable 一起使用。
public Observable<String> createListener(){
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) {
//I don't know what to put here in order to emit messages
//back to my ViewModel class using the websocket listener
}
});
}
Run Code Online (Sandbox Code Playgroud)
websocket 侦听器:
private WebSocketListener webSocketListener = new WebSocketListener() {
@Override
public void onOpen(@NotNull WebSocket webSocket, Response response) {
Timber.d("Ws connection opened...", response.toString());
}
@Override
public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
Timber.d("Ws connection closing...");
}
@Override
public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
Timber.d("Ws connection closed...");
}
@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
Timber.d("Ws incoming message.");
}
@Override
public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, Response response) {
Timber.e(t, "Ws connection failure.", response.toString());
}
};
Run Code Online (Sandbox Code Playgroud)
ViewModel 类中的一个函数正在观察我的 MainRepo 类中的 Observable:
public void connectToWs(){
mainRepo.createListener()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Timber.d("Subscribed");
}
@Override
public void onNext(@NonNull String s) {
Timber.d("Message: " + s);
}
@Override
public void onError(@NonNull Throwable e) {
Timber.e(e, "Something went wrong.");
}
@Override
public void onComplete() {
Timber.d("On complete.");
}
});
}
Run Code Online (Sandbox Code Playgroud)
创建一个PublishSubject并更改您的createListener方法以返回它:
private PublishSubject<String> publishSubject = PublishSubject.create<String>();
public Observable<String> createListener(){
return publishSubject;
}
Run Code Online (Sandbox Code Playgroud)
PublishSubject 是一个 Observable,因此请注意,您不需要更改方法签名,但我建议您将方法名称重命名为observeMessages.
然后在您的 websocket 侦听器中,您可以使用onNext方法将消息发送到 PublishSubject 。您还应该在 onClosed 方法中调用onComplete ,在 onFailure 方法中调用 onError :
private WebSocketListener webSocketListener = new WebSocketListener() {
@Override
public void onOpen(@NotNull WebSocket webSocket, Response response) {
Timber.d("Ws connection opened...", response.toString());
}
@Override
public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
Timber.d("Ws connection closing...");
}
@Override
public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
Timber.d("Ws connection closed...");
publishSubject.onComplete();
}
@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
Timber.d("Ws incoming message.");
publishSubject.onNext(text);
}
@Override
public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, Response response) {
Timber.e(t, "Ws connection failure.", response.toString());
publishSubject.onError(t);
}
};
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
178 次 |
| 最近记录: |