如何使用StreamChannel在Dart中进行双向通信?

Pac*_*ane 0 stream dart

有没有办法让Dart Streams await stream.first连续多次执行某些操作 ,使用有点像堆栈/队列的流?

据我所知,你可以通过在现有的Single-substription流上使用asBroadcastStream来做到这一点,但我觉得这并不理想.

也许有一种缓冲原语与一些Rx包或类似的东西?

我的用例如下:

我有一个StreamChannel(IOWebSocketChannel),我想按特定顺序发送/接收消息.

即:

Send Message0
Receive Message1
Send Message2
Receive Message3
Run Code Online (Sandbox Code Playgroud)

我肯定知道Message1服务器收到后才会到达Message0 (等的Message2Message3)

mat*_*rey 7

Dart对于基于推送的事件基于拉的资源读取都具有单一抽象的优势(并且在我看来,不利).这导致了一系列其他混乱,例如"我是否必须取消流订阅".

在基于推送的模型中,请说:

abstract class Element {
  Stream<MouseEvent> get onClick;
}
Run Code Online (Sandbox Code Playgroud)

发生单击时,类会通知您.可能存在介于0和字面无限点击事件之间的任何位置,并且通常不希望以与读取资源相同的方式缓冲或处理它们(特别是考虑到Dart是单线程的).另一个注意事项:拥有此流的任意数量的订阅者是完全有效的(多个类可能有兴趣知道何时发生点击).

另一方面,有一个基于拉的模型,比如读取文件:

abstract class File {
  Stream<String> readLines();
}
Run Code Online (Sandbox Code Playgroud)

这种情况下,您可能希望逐行处理,甚至可能在您点击某个字符串时停止,并且您肯定希望通过EOF通知(通常通过"完成"事件或流关闭,在惯用语中镖).另注:这是不是有效的有超过1个用户-这将变得棘手实快.

对于您的特定问题,您似乎想要:

  • 发送您自己的活动
  • 按需接收下一个活动
  • 处理事件,发送您自己的事件
  • 等等...

好的,让我们来看看你问题的细节:


据我所知,你可以通过在现有的Single-substription流上使用asBroadcastStream来做到这一点,但我觉得这并不理想.

非常不理想.单订阅流(我称之为"资源流",就像readLines上面的调用一样)会自动缓冲事件并等待订阅者.没有收到一行文本会非常糟糕,因为订阅发生文件被读取之后.

在另一方面,广播流也没有缓冲的事件.因此,如果您根据时间问题等采用您的方法,您可能会发现自己丢失了所发送的事件.

这里有几个选项.没有一个是完美的,但他们可能会帮助:

  • package:stream_transform软件包具有一组常用的转换Stream,包括一些受RX启发的转换.

  • package:async包具有用于处理异步代码的其他实用程序.具体来说,您可能会发现StreamQueue它完全符合您的要求:

    Future<void> processEvents(Stream<String> inputStream) async {
      var queue = new StreamQueue(inputStream);
      while (await queue.hasNext) {
        var next = await queue.next;
        // Insert processing here.
      }
    }
    
    Run Code Online (Sandbox Code Playgroud)

我有点觉得自己希望那StreamQueue dart:async,而且也没有之间的共享接口Stream用于用于赛事资源和流s,但是这是今天一个体面的方式.

干杯!