rom*_*the 6 java sockets reactive-programming rx-java
我仍在尝试学习RxJava。有一件事我现在无法束手无策。每一篇试图学习我如何使用Rx的文章都向我展示了如何基于已经“可预测”的源(即一定数量的项目的序列(单个值或简单的Iterable))创建Observable。 。
通常,您会看到一些类似的内容 Observable.just()
Observable<String> observerable = Observable.just("Hello, world!");
Run Code Online (Sandbox Code Playgroud)
或Observable.from():
Observable.from("apple", "orange", "banana").subscribe(fruit -> System.out.println(fruit));
Run Code Online (Sandbox Code Playgroud)
很好,但是下面的用例呢?
我有不断通过套接字推送的消息(我没有构建它,我只是在集成)。我需要“观察”通过套接字推送的数据序列。
很多人似乎都指向Obserable.using()(这里是一个例子),但是我也不认为这是正确的解决方案。通过套接字推送的消息是不完整的,因为它们具有最大长度。我需要自己“构造”消息,因此需要在每次从套接字推送之间进行缓冲。
换句话说,我正在寻找一种方法来根据从套接字推入的数据自己构造消息,然后将其推入Observable。我一直在寻找在整个地方执行此操作的正确方法,但是我似乎找不到合适的解决方案。
怎么样Observable通过完全自定义的行为吗?
Observable.create(subscriber -> {
Socket socket = getSocket();
socket.subscribe(new SocketListener() {
@Override public void onNewFrame(Frame frame) {
// Process frame and prepare payload to the subscriber.
if (payloadReadyForExternalObserver) {
if (subscriber.isUnsubscribed()) {
// Subscriber unsubscribed, let's close the socket.
socket.close();
} else {
subscriber.onNext(payload);
}
}
}
@Override public void onSocketError(IOException exception) {
subscriber.onError(exception); // Terminal state.
}
@Override public void onSocketClosed() {
subscriber.onCompleted(); // Terminal state.
}
});
})
Run Code Online (Sandbox Code Playgroud)
但是请确保您正确实施了Observable合同,有关更多信息,请阅读https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators
| 归档时间: |
|
| 查看次数: |
5335 次 |
| 最近记录: |