如何取消订阅PublishSubject和BehaviorSubject?

ari*_*nte 37 java subject-observer rx-java

下的subjects包你有这样的类PublishSubjectBehaviorSubject我想可以被描述为一些可用的样品Observables.

如何取消订阅这些主题?没有unsubscribe方法和调用onCompleted完全结束了Observable吗?

Sep*_*tem 70

主体是一个SubjectObservable同时,它可以像普通的可观察者一样取消订阅.使主题变得特别的是它是可观察者和观察者之间的桥梁.它可以通过重新发送它来传递它观察到的物品,它也可以发出新物品.由于承诺是期货,所以主题是可观察的.

这里是受试者家庭的简要描述:

AsyncSubject:仅发出源Observable的最后一个值

BehaviorSubject:当观察者订阅它时,发出最近发出的项和源Observable的所有后续项

PublishSubject:在订阅时发出源Observable的所有后续项

ReplaySubject:发出源Observable的所有项目,无论订阅者何时订阅.

官方文档提供了一些好的大理石图这使得它更容易理解

  • BehaviorSubject与ReplaySubject(1)基本相同吗? (2认同)
  • @DzmitryLazerka:BehaviorSubject始终具有初始值。如果要查找没有初始值的BehaviorSubject,请使用ReplaySubject(1)。 (2认同)

mar*_*oop 26

主题基本上是观测量和观察员.

Observable本质上是一个具有Observer并返回订阅的函数的东西.因此,例如,给定简单的可观察量:

    Observable<Integer> observable = Observable.create(new Observable.OnSubscribeFunc<Integer>() {
        @Override
        public Subscription onSubscribe(Observer<? super Integer> observer) {
            observer.onNext(3);
            observer.onNext(2);
            observer.onNext(1);
            observer.onCompleted();

            return Subscriptions.empty();
        }
    });
Run Code Online (Sandbox Code Playgroud)

在这里我们会订阅它,为每个整数打印一行:

    Subscription sub = observable.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            System.out.println(integer);
        }
    });
Run Code Online (Sandbox Code Playgroud)

您可以通过致电取消订阅上述内容sub.unsubscribe().

这是一个PublishSubject,大致相同:

    PublishSubject<Integer> publishSubject = PublishSubject.create();
    Subscription subscription = publishSubject.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            System.out.println(integer);
        }
    });

    publishSubject.onNext(3);
    publishSubject.onNext(2);
    publishSubject.onNext(1);
Run Code Online (Sandbox Code Playgroud)

您可以通过调用以相同的方式取消订阅subscription.unsubscribe().

  • .@vach - 按照上面的代码,你可以让用户在并行处理onNext通过执行类似如下:`认购subscription1 = publishSubject.observeOn(Schedulers.newThread())认购(...);`和`认购subscription2 = .publishSubject.observeOn(Schedulers.newThread())认购(...);` (3认同)

Pra*_*pta 5

您可以使用任何多种方法订阅的所有Subjects扩展.调用任何方法都会返回一个.Observablesubscribe(...)subscribe(...)Subscription

Subscription subscription = anySubject.subscribe(...);
Run Code Online (Sandbox Code Playgroud)

如果要停止从主题中侦听事件,请使用此subscription实例的unsubscribe()方法.

subscription.unsubscribe();
Run Code Online (Sandbox Code Playgroud)