She*_*bic 5 java reactive-programming rx-java
我正在使用一个名为AppWarp(http://appwarp.shephertz.com)的多人游戏客户端,您可以在其中添加事件监听器,以便在事件发生时回调,让我们假设我们将讨论连接侦听器,其中你需要实现这个接口:
public interface ConnectionRequestListener {
void onConnectDone(ConnectEvent var1);
void onDisconnectDone(ConnectEvent var1);
void onInitUDPDone(byte var1);
}
Run Code Online (Sandbox Code Playgroud)
我的目标是主要创建此客户端的Reactive版本,以便在我的应用程序内部使用,而不是直接使用客户端本身(我也将在以后依赖于接口,而不是像示例中那样依赖于WarpClient本身,但是这不重要,请在最后阅读我的问题).
所以我做的如下:
1)我引入了一个新事件,名为RxConnectionEvent(主要对Connection相关事件进行分组),如下所示:
public class RxConnectionEvent {
// This is the original connection event from the source client
private final ConnectEvent connectEvent;
// this is to identify if it was Connection / Disconnection
private final int eventType;
public RxConnectionEvent(ConnectEvent connectEvent, int eventType) {
this.connectEvent = connectEvent;
this.eventType = eventType;
}
public ConnectEvent getConnectEvent() {
return connectEvent;
}
public int getEventType() {
return eventType;
}
}
Run Code Online (Sandbox Code Playgroud)
2)创建了一些事件类型如下:
public class RxEventType {
// Connection Events
public final static int CONNECTION_CONNECTED = 20;
public final static int CONNECTION_DISCONNECTED = 30;
}
Run Code Online (Sandbox Code Playgroud)
3)创建了以下observable,它发出了我的新RxConnectionEvent
import com.shephertz.app42.gaming.multiplayer.client.WarpClient;
import com.shephertz.app42.gaming.multiplayer.client.events.ConnectEvent;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;
public class ConnectionObservable extends BaseObservable<RxConnectionEvent> {
private ConnectionRequestListener connectionListener;
// This is going to be called from my ReactiveWarpClient (Factory) Later.
public static Observable<RxConnectionEvent> createConnectionListener(WarpClient warpClient) {
return Observable.create(new ConnectionObservable(warpClient));
}
private ConnectionObservable(WarpClient warpClient) {
super(warpClient);
}
@Override
public void call(final Subscriber<? super RxConnectionEvent> subscriber) {
subscriber.onStart();
connectionListener = new ConnectionRequestListener() {
@Override
public void onConnectDone(ConnectEvent connectEvent) {
super.onConnectDone(connectEvent);
callback(new RxConnectionEvent(connectEvent, RxEventType.CONNECTION_CONNECTED));
}
@Override
public void onDisconnectDone(ConnectEvent connectEvent) {
super.onDisconnectDone(connectEvent);
callback(new RxConnectionEvent(connectEvent, RxEventType.CONNECTION_DISCONNECTED));
}
// not interested in this method (for now)
@Override
public void onInitUDPDone(byte var1) { }
private void callback(RxConnectionEvent rxConnectionEvent)
{
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(rxConnectionEvent);
} else {
warpClient.removeConnectionRequestListener(connectionListener);
}
}
};
warpClient.addConnectionRequestListener(connectionListener);
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
onUnsubscribed(warpClient);
}
}));
}
@Override
protected void onUnsubscribed(WarpClient warpClient) {
warpClient.removeConnectionRequestListener(connectionListener);
}
}
Run Code Online (Sandbox Code Playgroud)
4)最后我的BaseObservable如下所示:
public abstract class BaseObservable<T> implements Observable.OnSubscribe<T> {
protected WarpClient warpClient;
protected BaseObservable (WarpClient warpClient)
{
this.warpClient = warpClient;
}
@Override
public abstract void call(Subscriber<? super T> subscriber);
protected abstract void onUnsubscribed(WarpClient warpClient);
}
Run Code Online (Sandbox Code Playgroud)
我的问题主要是:我的实现是正确的还是我应该为每个事件创建单独的observable,但如果是这样,这个客户端有超过40-50个事件我是否必须为每个事件创建单独的observable?
我也使用上面的代码如下(在一个简单的"非最终"集成测试中使用它):
public void testConnectDisconnect() {
connectionSubscription = reactiveWarpClient.createOnConnectObservable(client)
.subscribe(new Action1<RxConnectionEvent>() {
@Override
public void call(RxConnectionEvent rxEvent) {
assertEquals(WarpResponseResultCode.SUCCESS, rxEvent.getConnectEvent().getResult());
if (rxEvent.getEventType() == RxEventType.CONNECTION_CONNECTED) {
connectionStatus = connectionStatus | 0b0001;
client.disconnect();
} else {
connectionStatus = connectionStatus | 0b0010;
connectionSubscription.unsubscribe();
haltExecution = true;
}
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
fail("Unexpected error: " + throwable.getMessage());
haltExecution = true;
}
});
client.connectWithUserName("test user");
waitForSomeTime();
assertEquals(0b0011, connectionStatus);
assertEquals(true, connectionSubscription.isUnsubscribed());
}
Run Code Online (Sandbox Code Playgroud)
我建议您避免直接扩展 BaseObservable,因为它非常容易出错。相反,尝试使用 Rx 本身为您提供的工具来创建您的 observable。
最简单的解决方案是使用PublishSubject,它既是 Observable 又是 Subscriber。侦听器只需要调用主体的onNext,主体就会发出事件。这是一个简化的工作示例:
public class PublishSubjectWarpperDemo {
public interface ConnectionRequestListener {
void onConnectDone();
void onDisconnectDone();
void onInitUDPDone();
}
public static class RxConnectionEvent {
private int type;
public RxConnectionEvent(int type) {
this.type = type;
}
public int getType() {
return type;
}
public String toString() {
return "Event of Type " + type;
}
}
public static class SimpleCallbackWrapper {
private final PublishSubject<RxConnectionEvent> subject = PublishSubject.create();
public ConnectionRequestListener getListener() {
return new ConnectionRequestListener() {
@Override
public void onConnectDone() {
subject.onNext(new RxConnectionEvent(1));
}
@Override
public void onDisconnectDone() {
subject.onNext(new RxConnectionEvent(2));
}
@Override
public void onInitUDPDone() {
subject.onNext(new RxConnectionEvent(3));
}
};
}
public Observable<RxConnectionEvent> getObservable() {
return subject;
}
}
public static void main(String[] args) throws IOException {
SimpleCallbackWrapper myWrapper = new SimpleCallbackWrapper();
ConnectionRequestListener listner = myWrapper.getListener();// Get the listener and attach it to the game here.
myWrapper.getObservable().observeOn(Schedulers.newThread()).subscribe(event -> System.out.println(event));
listner.onConnectDone(); // Call the listener a few times, the observable should print the event
listner.onDisconnectDone();
listner.onInitUDPDone();
System.in.read(); // Wait for enter
}
}
Run Code Online (Sandbox Code Playgroud)
一个更复杂的解决方案是使用onSubscribe实现之一来创建一个 observable Observable.create()。例如AsyncOnSubscibe。此解决方案的好处是可以正确处理 backperssure,因此您的事件订阅者不会被事件淹没。但就您而言,这听起来不太可能发生,因此增加的复杂性可能不值得。
| 归档时间: |
|
| 查看次数: |
2862 次 |
| 最近记录: |