在RxJava中,如何启动从API生成的潜在无限事件流?

sco*_*awg 4 android reactive-programming rx-java rx-android

我有一个可供客户使用的API,可以简化为:

public class API {
  public void sendEvent(Event e);
}
Run Code Online (Sandbox Code Playgroud)

Event每当客户端调用API(技术上将Binder转换为Service衍生产品)时,实例就会进入我的系统,然后处理,过滤并分派到其他内部组件.我不关心过去的事件,只关注用户订阅时可用的事件.对于Rx范例来说,这似乎是一种天生的选择,而我只是在沾沾自喜.

我需要一个创建一次的Observable,允许多个订阅者,并且可以Event通过反应管道向观察者发送其实例.一个Subject看起来适合什么,我希望做的(尤其是这个答案这个问题和我产生共鸣).

其他RxJava用户推荐什么?

njz*_*zk2 8

例如,关于我的简短评论:

public class API implements OnSubscribe<Event> {
    private List<Subscriber<Event>> subscribers = new ArrayList<>();

    public void sendEvent(Event event) {
        // Do whatever you need with the event
        for (Subscriber<Event> sub : subscribers) {
            sub.onNext(event);
        }
    }
    public void call(Subscriber<Event> sub) {
        subscribers.add(sub);
    }
}
Run Code Online (Sandbox Code Playgroud)

然后你可能在某个地方有一个实例: API api = ...

您的Observable是这样获得的:Observable.create(api);然后您可以使用Observable执行任何正常操作.

对未订阅的Subscribers 的过滤留给读者作为练习.

编辑

更多研究表明PublishSubject应该有所帮助:

public class API {
    private PublishSubject<Event> subject = PublishSubject.create();

    public void sendEvent(Event event) {
        // Do whatever you need with the event
        // Then publish it
        subject.onNext(event);
    }
    public Observable<Event> getObservable() {
        return subject.asObservable();
    }
}
Run Code Online (Sandbox Code Playgroud)

这样,您可以订阅此Observable,并且每次发送事件时,都会将API其发布给所有订阅者.

使用这样:

API api = ...;
api.getObservable().subscribe(event -> doStuffWithEvent(event));
api.getObservable().subscribe(event -> doOtherStuffWithEvent(event));
Run Code Online (Sandbox Code Playgroud)