取消流onData

Zie*_*mdi 11 dart

我有一个事件总线,可以处理我应用程序中的所有中央事件。我有一种特殊情况,即我有一系列异步动作,只想执行一次(发生特殊事件时),所以我必须启动一个异步函数,使其失去控制,其他函数将触发一个事件。我的第二个动作,依此类推。

所以我需要启动动作一,然后听事件总线等待动作一触发(不直接)触发将启动动作二的事件,依此类推...

自然,一旦执行了序列的每个元素,我就想停止监听触发它的事件。

为此,我想像了一个ConsumerOnce(event,action)函数,该函数将订阅总线,等待预期的事件,在接收到事件时执行该动作,并在动作启动后立即取消订阅(异步)

  final StreamController<Map<PlaceParam, dynamic>> _controller =
  new StreamController<Map<PlaceParam, dynamic>>.broadcast();

  void consumeOnce(PlaceParam param, Function executeOnce) {
    StreamSubscription subscription = _controller.stream.listen((Map<PlaceParam, dynamic> params) {
      if(params.containsKey(param)) {
        executeOnce();
        subscription.cancel(); //can't access, too early: not created yet
      }
    });
  }
Run Code Online (Sandbox Code Playgroud)

问题是我无法在回调的主体中访问变量预订,因为当时尚未创建变量预订

由于监听器不会按照订阅顺序执行任何担保,因此我无法注册另一个将删除我的订阅的订阅者(即使执行顺序得到保证,无论如何我都会找到无法删除的订阅来找到自己:负责删除我的原始订阅)...

有什么想法吗?

这种模式可以解决我的问题,但是我觉得它并不优雅:

@Injectable()
class EventBus<K, V> {
  final StreamController<Map<PlaceParam, dynamic>> _controller =
  new StreamController<Map<PlaceParam, dynamic>>.broadcast();

  Future<Null> fire(Map<PlaceParam, dynamic> params) async {
    await _controller.add(params);
  }

  Stream<Map<PlaceParam, dynamic>> getBus() {
    return _controller.stream;
  }


  void consumeOnce(PlaceParam param, Function executeOnce) {
    SubscriptionRemover remover = new SubscriptionRemover(param, executeOnce);
    StreamSubscription subscription = _controller.stream.listen(remover.executeOnce);
    remover.subscription = subscription;
  }
}

class SubscriptionRemover {
  PlaceParam param;
  Function executeOnce;
  StreamSubscription subscription;

  SubscriptionRemover(this.param, this.executeOnce);

  void execute(Map<PlaceParam, dynamic> params) {
    if (params.containsKey(param)) {
      executeOnce();
      subscription.cancel();
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

但我不太喜欢,因为从理论上讲,该事件可能在两个调用之间发生:

    StreamSubscription subscription = _controller.stream.listen(remover.executeOnce); //event may occur now!!!
    remover.subscription = subscription;
Run Code Online (Sandbox Code Playgroud)

我认为存在一个方法:_controller.stream.remove(Function fn)会更加直接和清晰。

我对吗?还是有我没想到的方法?

lrn*_*lrn 13

问题是我无法在回调的主体中访问变量预订,因为当时尚未创建变量预订

那不是完全正确的-您无法访问的是subscription 变量。Dart不允许变量声明引用自己。当变量仅在尚未执行的闭包中引用时,这有时会很烦人。

解决方案是onData在进行侦听之后预先声明变量或更新-listener:

// Pre-declare variable (can't be final, though).
StreamSubscription<Map<PlaceParam, dynamic>> subscription;
subscription = stream.listen((event) {
  .... subscription.cancel();
});
Run Code Online (Sandbox Code Playgroud)

要么

final subscription = stream.listen(null);
subscription.onData((event) {  // Update onData after listening.
  .... subscription.cancel(); ....
});
Run Code Online (Sandbox Code Playgroud)

在某些情况下,您还不能访问订阅对象,但这仅在流违反Stream合同并在监听时立即开始发送事件的情况下才可能。流一定不能这样做,它们必须等到以后的微任务才传递第一个事件,这样被调用的代码listen才有时间接收订阅并将其分配给变量。使用同步流控制器可能违反合同(这是为什么应谨慎使用同步流控制器的原因之一)。


Gün*_*uer 5

只需事先声明变量,然后就可以在回调中访问它:

StreamSubscription subscription;
subscription = _controller.stream.listen((Map<PlaceParam, dynamic> params) {
  if(params.containsKey(param)) {
    executeOnce();
    subscription.cancel(); 
  }
});
Run Code Online (Sandbox Code Playgroud)


Col*_*son 5

只调用一次的回调是很恶心的。我认为返回一个更符合 Dart 风格Future

Future<Null> consumeOnce(PlaceParam param) async {
  await _controller.stream.singleWhere((params) => params.containsKey(param));
}
Run Code Online (Sandbox Code Playgroud)

鉴于这是一句俏话,我不确定它是否真的有必要,除非它发生很多。EventBus 的客户端可以getBus().singleWhere同样轻松地调用。

如果您确实执着于使用回调的想法,您可以保留参数executeOnce并在等待对 的调用后调用它singleWhere,但我真的想不出这样做会更好的情况。