通常使用Reactive Streams Processor作为事件总线是否可以?

Kev*_*ede 3 event-bus rx-java reactive-streams

我开始学习反应流,因为我很好奇使用RxJava作为更传统事件总线的替代品的新趋势. 这篇博文是对其完成方式的典型描述.如果我理解正确,RxJava 1.x并不是Reactive Streams的严格实现,但它非常相似.2.0版包含一些符合要求的类,或至少通过TCK,因此该代码的更新版本可能看起来有点不同.

public class UserLocationModel {

  private PublishSubject<LatLng> subject = PublishSubject.create();

  public void setLocation(LatLng latLng) {
    subject.onNext(latLng);
  }

  public Observable<LatLng> getUserLocation() {
    return subject;
  }
}
Run Code Online (Sandbox Code Playgroud)

在Reactive Streams术语中,我认为subject是a Processor,它既是a Publisher又是a Subscriber.

问题是,呼吁onNextSubscriber未订购任何东西,似乎违反了无流规范,特别是排除1.9.

这仅仅是一个实现细节吗? 我是否正确地认为您通常不能依赖此协议来实现兼容的Reactive Streams实施?

aka*_*okd 7

SubjectProcessor标准RxJava 2的s和s是放宽的,因此onSubscribe在调用其他方法之前不必调用它们.这部分是由于传统性为1.x受试者没有onSubscribe,部分原因是RxJava 2处理器不能通过选择协调Subscriber侧面和Publisher侧面之间的请求,因此没有用于a Subscription.

如果您订购Processor符合RS 的RxJava Publisher,它们似乎会尽可能地请求Long.MAX_VALUE和转发信号.如果您订阅符合SubscriberRxJava Processor的RS ,他们将尊重那些Subscribers 的背压并且永远不会溢出它们,但是,缺少请求可能会导致个人MissingBackpressureException被释放并被Subscriber"抛出".有一个自定义Publisher扩展库,做协调请求.

我是否正确地认为您通常不能依赖此工作与合规的Reactive Streams实施.

规范中没有任何内容,因此未在TCK中进行测试,Processor如果没有接收到onSubscribe呼叫但它需要它会发生什么,因此,我认为这已经成为一个实现细节.

这里有两个更大的问题:

  1. 发明主题是为了将命令式世界与反应世界联系起来,并在GUI案例和非背压案件中很好地作为事件的多播器.在反应性反应多播中,它们是更好和更直接的替代方案,例如publish(Function).
  2. 事件总线的思考是向后退一步,因为你通过在一个"轨道"上铲除和排除事件来创建一个阻塞点.相比之下,反应性设计有利于单独的和通常独立的流,其中每个流可以根据需要在线程之间跳转,并且可能避免主线程直到最后一刻.