Dan*_*ali 13 android firebase rx-java
我在我的应用程序中使用Firebase以及RxJava.Firebase能够在后端数据中发生更改(添加,删除,更改等)时通知您的应用.我试图将Firebase的功能与RxJava结合起来.
调用我正在侦听的数据Leisure,以及包含a 和更新类型(添加,删除,移动,更改)的Observable发出.LeisureUpdateLeisure
这是我的方法,允许订阅此事件.
private Observable<LeisureUpdate> leisureUpdatesObservable;
private ChildEventListener leisureUpdatesListener;
private int leisureUpdatesSubscriptionsCount;
@NonNull
public Observable<LeisureUpdate> subscribeToLeisuresUpdates() {
if (leisureUpdatesObservable == null) {
leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe<LeisureUpdate>() {
@Override
public void call(final Subscriber<? super LeisureUpdate> subscriber) {
leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener() {
@Override
public void onChildAdded(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.ADDED));
}
@Override
public void onChildChanged(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.CHANGED));
}
@Override
public void onChildRemoved(DataSnapshot dataSnapshot) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.REMOVED));
}
@Override
public void onChildMoved(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.MOVED));
}
@Override
public void onCancelled(FirebaseError firebaseError) {
subscriber.onError(new Error(firebaseError.getMessage()));
}
});
}
});
}
leisureUpdatesSubscriptionsCount++;
return leisureUpdatesObservable;
}
Run Code Online (Sandbox Code Playgroud)
首先,我想使用Observable.fromCallable()方法来创建Observable,但我想这是不可能的,因为Firebase使用回调,对吧?
我保留了一个实例,Observable以便始终拥有Observable多个Subscriber可以订阅的实例.
当每个人都取消订阅并且我需要停止收听Firebase中的事件时,问题就出现了.Observable如果还有任何订阅,我还是没有找到理解.所以我一直计算有多少来电得subscribeToLeisuresUpdates(),有leisureUpdatesSubscriptionsCount.
然后每当有人想要取消订阅时,就必须打电话
@Override
public void unsubscribeFromLeisuresUpdates() {
if (leisureUpdatesObservable == null) {
return;
}
leisureUpdatesSubscriptionsCount--;
if (leisureUpdatesSubscriptionsCount == 0) {
firebase.child(FirebaseStructure.LEISURES).removeEventListener(leisureUpdatesListener);
leisureUpdatesObservable = null;
}
}
Run Code Online (Sandbox Code Playgroud)
这是我发现Observable在有订阅者时发出项目的唯一方法,但我觉得必须有一种更简单的方法,特别是当没有更多订阅者收听可观察对象时.
遇到类似问题或采用不同方法的人?
您可以使用Observable.fromEmitter,这些内容
return Observable.fromEmitter(new Action1<Emitter<LeisureUpdate>>() {
@Override
public void call(final Emitter<LeisureUpdate> leisureUpdateEmitter) {
final ValueEventListener listener = new ValueEventListener() {
@Override
public void onDataChange(DataSnapshot dataSnapshot) {
// process update
LeisureUpdate leisureUpdate = ...
leisureUpdateEmitter.onNext(leisureUpdate);
}
@Override
public void onCancelled(DatabaseError databaseError) {
leisureUpdateEmitter.onError(new Throwable(databaseError.getMessage()));
mDatabaseReference.removeEventListener(this);
}
};
mDatabaseReference.addValueEventListener(listener);
leisureUpdateEmitter.setCancellation(new Cancellable() {
@Override
public void cancel() throws Exception {
mDatabaseReference.removeEventListener(listener);
}
});
}
}, Emitter.BackpressureMode.BUFFER);
Run Code Online (Sandbox Code Playgroud)
将其放在最后的 Observable.create() 中。
subscriber.add(Subscriptions.create(new Action0() {
@Override public void call() {
ref.removeEventListener(leisureUpdatesListener);
}
}));
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
10285 次 |
| 最近记录: |