将Firebase实时数据侦听器与RxJava结合使用

Dan*_*ali 13 android firebase rx-java

我在我的应用程序中使用Firebase以及RxJava.Firebase能够在后端数据中发生更改(添加,删除,更改等)时通知您的应用.我试图将Firebase的功能与RxJava结合起来.

调用我正在侦听的数据Leisure,以及包含a 和更新类型(添加,删除,移动,更改)的Observable发出.LeisureUpdateLeisure

这是我的方法,允许订阅此事件.

private Observable<LeisureUpdate> leisureUpdatesObservable;
private ChildEventListener leisureUpdatesListener;
private int leisureUpdatesSubscriptionsCount;

@NonNull
public Observable<LeisureUpdate> subscribeToLeisuresUpdates() {
    if (leisureUpdatesObservable == null) {
        leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe<LeisureUpdate>() {

            @Override
            public void call(final Subscriber<? super LeisureUpdate> subscriber) {
                leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener() {
                    @Override
                    public void onChildAdded(DataSnapshot dataSnapshot, String s) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.ADDED));
                    }

                    @Override
                    public void onChildChanged(DataSnapshot dataSnapshot, String s) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.CHANGED));
                    }

                    @Override
                    public void onChildRemoved(DataSnapshot dataSnapshot) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.REMOVED));
                    }

                    @Override
                    public void onChildMoved(DataSnapshot dataSnapshot, String s) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.MOVED));
                    }

                    @Override
                    public void onCancelled(FirebaseError firebaseError) {
                        subscriber.onError(new Error(firebaseError.getMessage()));
                    }
                });
            }
        });
    }
    leisureUpdatesSubscriptionsCount++;
    return leisureUpdatesObservable;
}
Run Code Online (Sandbox Code Playgroud)

首先,我想使用Observable.fromCallable()方法来创建Observable,但我想这是不可能的,因为Firebase使用回调,对吧?

我保留了一个实例,Observable以便始终拥有Observable多个Subscriber可以订阅的实例.

当每个人都取消订阅并且我需要停止收听Firebase中的事件时,问题就出现了.Observable如果还有任何订阅,我还是没有找到理解.所以我一直计算有多少来电得subscribeToLeisuresUpdates(),有leisureUpdatesSubscriptionsCount.

然后每当有人想要取消订阅时,就必须打电话

@Override
public void unsubscribeFromLeisuresUpdates() {
    if (leisureUpdatesObservable == null) {
        return;
    }
    leisureUpdatesSubscriptionsCount--;
    if (leisureUpdatesSubscriptionsCount == 0) {
        firebase.child(FirebaseStructure.LEISURES).removeEventListener(leisureUpdatesListener);
        leisureUpdatesObservable = null;
    }
}
Run Code Online (Sandbox Code Playgroud)

这是我发现Observable在有订阅者时发出项目的唯一方法,但我觉得必须有一种更简单的方法,特别是当没有更多订阅者收听可观察对象时.

遇到类似问题或采用不同方法的人?

Fra*_*esc 6

您可以使用Observable.fromEmitter,这些内容

    return Observable.fromEmitter(new Action1<Emitter<LeisureUpdate>>() {
        @Override
        public void call(final Emitter<LeisureUpdate> leisureUpdateEmitter) {
            final ValueEventListener listener = new ValueEventListener() {
                @Override
                public void onDataChange(DataSnapshot dataSnapshot) {
                    // process update
                    LeisureUpdate leisureUpdate = ...
                    leisureUpdateEmitter.onNext(leisureUpdate);
                }

                @Override
                public void onCancelled(DatabaseError databaseError) {
                    leisureUpdateEmitter.onError(new Throwable(databaseError.getMessage()));
                    mDatabaseReference.removeEventListener(this);
                }
            };
            mDatabaseReference.addValueEventListener(listener);
            leisureUpdateEmitter.setCancellation(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    mDatabaseReference.removeEventListener(listener);
                }
            });
        }
    }, Emitter.BackpressureMode.BUFFER);
Run Code Online (Sandbox Code Playgroud)


Vin*_*raj 3

将其放在最后的 Observable.create() 中。

subscriber.add(Subscriptions.create(new Action0() {
                    @Override public void call() {
                        ref.removeEventListener(leisureUpdatesListener);
                    }
                }));
Run Code Online (Sandbox Code Playgroud)