将 Stream<Stream<T>> 转换为 Stream<T>

Dip*_*ipu 5 dart rxdart

我正在使用rxdart包来处理 dart 中的流。我被困在处理一个特殊的问题上。

请看一下这个虚拟代码:

final userId = BehaviorSubject<String>();

Stream<T> getStream(String uid) {
  // a sample code that returns a stream
  return BehaviorSubject<T>().stream;
}

final Observable<Stream<T>> oops = userId.map((uid) => getStream(uid));
Run Code Online (Sandbox Code Playgroud)

现在我想将oops变量转换为仅获取Observable<T>.

我发现很难解释清楚。但让我试试。我有一个流 A。我将流 A 的每个输出映射到另一个流 B。现在我有Stream<Stream<B>>- 一种循环流。我只想听听这个模式产生的最新值。我怎样才能做到这一点?

Dip*_*ipu 5

我将列出几种将 扁平 Stream<Stream<T>>化为 single 的方法Stream<T>

1. 使用纯飞镖

正如@ Irn所回答的,这是一个纯飞镖解决方案:

Stream<T> flattenStreams<T>(Stream<Stream<T>> source) async* {
  await for (var stream in source) yield* stream;
}

Stream<int> getStream(String v) {
    return Stream.fromIterable([1, 2, 3, 4]);
}

void main() {
    List<String> list = ["a", "b", "c"];
    Stream<int> s = flattenStreams(Stream.fromIterable(list).map(getStream));
    s.listen(print);
}
Run Code Online (Sandbox Code Playgroud)

输出: 1 2 3 4 1 2 3 4 1 2 3 4

2. 使用 Observable.flatMap

Observable 有一个 flatMap 方法可以将输出流展平并将其附加到正在进行的流中:

import 'package:rxdart/rxdart.dart';

Stream<int> getStream(String v) {
    return Stream.fromIterable([1, 2, 3, 4]);
}

void main() {
    List<String> list = ["a", "b", "c"];
    Observable<int> s = Observable.fromIterable(list).flatMap(getStream);
    s.listen(print);
}
Run Code Online (Sandbox Code Playgroud)

输出: 1 2 3 4 1 2 3 4 1 2 3 4

3. 使用 Observable.switchLatest

将发出流的流(也称为“高阶流”)转换为单个 Observable,该 Observable 发出由这些流中最近发出的项目发出的项目。

这是我一直在寻找的解决方案!我只需要内部流发出的最新输出。

import 'package:rxdart/rxdart.dart';

Stream<int> getStream(String v) {
    return Stream.fromIterable([1, 2, 3, 4]);
}

void main() {
    List<String> list = ["a", "b", "c"];
    Observable<int> s = Observable.switchLatest(
            Observable.fromIterable(list).map(getStream));
    s.listen(print);
}
Run Code Online (Sandbox Code Playgroud)

输出: 1 1 1 2 3 4


lrn*_*lrn 3

拥有 a 的情况比较罕见Stream<Stream<Something>>,因此没有太多明确的支持。

一个原因是有多种(至少两种)方法可以将事物流组合成事物流。

  1. 要么依次监听每个流,等待它完成,然后再开始下一个流,然后按顺序发出事件。

  2. 或者,您可以在每个新流可用时监听它,然后尽快从任何流中发出事件。

前者很容易使用async/编写await

Stream<T> flattenStreams<T>(Stream<Stream<T>> source) async* {
  await for (var stream in source) yield* stream;
}
Run Code Online (Sandbox Code Playgroud)

后者更为复杂,因为它需要一次监听多个流,并将它们的事件组合起来。(如果StreamController.addStream一次只允许多个流,那么会容易得多)。您可以使用以下类StreamGrouppackage:async

import "package:async/async" show StreamGroup;

Stream<T> mergeStreams<T>(Stream<Stream<T>> source) {
  var sg = StreamGroup<T>();
  source.forEach(sg.add).whenComplete(sg.close);
  // This doesn't handle errors in [source].
  // Maybe insert 
  //   .catchError((e, s) {
  //      sg.add(Future<T>.error(e, s).asStream())
  // before `.whenComplete` if you worry about errors in [source].          
  return sg.stream;
}
Run Code Online (Sandbox Code Playgroud)