我已经实现了流转换器。请注意,这只是一个练习(为了学习 Dart)。该转换器将整数转换为字符串。我给出了下面的代码,你也可以在GitHub上找到它。
// Conceptually, a transformer is simply a function from Stream to Stream that
// is encapsulated into a class.
//
// A transformer is made of:
// - A stream controller. The controller provides the "output" stream that will
// receive the transformed values.
// - A "bind()" method. This method is called by the "input" stream "transform"
// method (inputStream.transform(<the stream transformer>).
import 'dart:async';
/// This class defines the implementation of a class that emulates a function
/// that converts a data with a given type (S) into a data with another type (T).
abstract class TypeCaster<S, T> {
T call(S value);
}
/// This class emulates a converter from integers to strings.
class Caster extends TypeCaster<int, String> {
String call(int value) {
return "<${value.toString()}>";
}
}
// StreamTransformer<S, T> is an abstract class. The functions listed below must
// be implemented:
// - Stream<T> bind(Stream<S> stream)
// - StreamTransformer<RS, RT> cast<RS, RT>()
class CasterTransformer<S, T> implements StreamTransformer<S, T> {
StreamController<T> _controller;
bool _cancelOnError;
TypeCaster<S, T> _caster;
// Original (or input) stream.
Stream<S> _stream;
// The stream subscription returned by the call to the function "listen", of
// the original (input) stream (_stream.listen(...)).
StreamSubscription<S> _subscription;
/// Constructor that creates a unicast stream.
/// [caster] An instance of "type caster".
CasterTransformer(TypeCaster<S, T> caster, {
bool sync: false,
bool cancelOnError: true
}) {
_controller = new StreamController<T>(
onListen: _onListen,
onCancel: _onCancel,
onPause: () => _subscription.pause(),
onResume: () => _subscription.resume(),
sync: sync
);
_cancelOnError = cancelOnError;
_caster = caster;
}
/// Constructor that creates a broadcast stream.
/// [caster] An instance of "type caster".
CasterTransformer.broadcast(TypeCaster<S, T> caster, {
bool sync: false,
bool cancelOnError: true
}) {
_cancelOnError = cancelOnError;
_controller = new StreamController<T>.broadcast(
onListen: _onListen,
onCancel: _onCancel,
sync: sync
);
_caster = caster;
}
/// Handler executed whenever a listener subscribes to the controller's stream.
/// Note: when the transformer is applied to the original stream, through call
/// to the method "transform", the method "bind()" is called behind the
/// scenes. The method "bind()" returns the controller stream.
/// When a listener is applied to the controller stream, then this function
/// (that is "_onListen()") will be executed. This function will set the
/// handler ("_onData") that will be executed each time a value appears
/// in the original stream. This handler takes the incoming value, casts
/// it, and inject it to the (controller) output stream.
/// Note: this method is called only once. On the other hand, the method "_onData"
/// is called as many times as there are values to transform.
void _onListen() {
_subscription = _stream.listen(
_onData,
onError: _controller.addError,
onDone: _controller.close,
cancelOnError: _cancelOnError
);
}
/// Handler executed whenever the subscription to the controller's stream is cancelled.
void _onCancel() {
_subscription.cancel();
_subscription = null;
}
/// Handler executed whenever data comes from the original (input) stream.
/// Please note that the transformation takes place here.
/// Note: this method is called as many times as there are values to transform.
void _onData(S data) {
_controller.add(_caster(data));
}
/// This method is called once, when the stream transformer is assigned to the
/// original (input) stream. It returns the stream provided by the controller.
/// Note: here, you can see that the process transforms a value of type
/// S into a value of type T. Thus, it is necessary to provide a function
/// that performs the conversion from type S to type T.
/// Note: the returned stream may accept only one, or more than one, listener.
/// This depends on the method called to instantiate the transformer.
/// * CasterTransformer() => only one listener.
/// * CasterTransformer.broadcast() => one or more listener.
Stream<T> bind(Stream<S> stream) {
_stream = stream;
return _controller.stream;
}
// TODO: what should this method do ? Find the answer.
StreamTransformer<RS, RT> cast<RS, RT>() {
return StreamTransformer<RS, RT>((Stream<RS> stream, bool b) {
// What should we do here ?
});
}
}
main() {
// ---------------------------------------------------------------------------
// TEST: unicast controller.
// ---------------------------------------------------------------------------
// Create a controller that will be used to inject integers into the "input"
// stream.
StreamController<int> controller_unicast = new StreamController<int>();
// Get the stream "to control".
Stream<int> integer_stream_unicast = controller_unicast.stream;
// Apply a transformer on the "input" stream.
// The method "transform" calls the method "bind", which returns the stream that
// receives the transformed values.
Stream<String> string_stream_unicast = integer_stream_unicast.transform(CasterTransformer<int, String>(new Caster()));
string_stream_unicast.listen((data) {
print('String => $data');
});
// Inject integers into the "input" stream.
controller_unicast.add(1);
controller_unicast.add(2);
controller_unicast.add(3);
// ---------------------------------------------------------------------------
// TEST: broadcast controller.
// ---------------------------------------------------------------------------
StreamController<int> controller_broadcast = new StreamController<int>.broadcast();
Stream<int> integer_stream_broadcast = controller_broadcast.stream;
Stream<String> string_stream_broadcast = integer_stream_broadcast.transform(CasterTransformer<int, String>.broadcast(new Caster()));
string_stream_broadcast.listen((data) {
print('Listener 1: String => $data');
});
string_stream_broadcast.listen((data) {
print('Listener 2: String => $data');
});
controller_broadcast.add(1);
controller_broadcast.add(2);
controller_broadcast.add(3);
}
Run Code Online (Sandbox Code Playgroud)
该类CasterTransformer<S, T>
扩展了抽象类StreamTransformer<S, T>
。
因此,它实现了该方法StreamTransformer<RS, RT> cast<RS, RT>()
。
在文档中,据说:
生成的转换器将在运行时检查它转换的流的所有数据事件是否实际上是 S 的实例,并将检查该转换器生成的所有数据事件是否实际上是 RT 的实例。
请参阅:https ://api.dartlang.org/stable/2.1.0/dart-async/StreamTransformer/cast.html
首先,我认为本文档中有一个拼写错误:它应该说“ ......it 转换实际上是 RS 的实例”(而不是 S)。
然而,这对我来说似乎很模糊。
有人可以解释该方法的目的Cast()
吗?
该cast
方法可以帮助输入操作。
如果你有一个StreamTransformer<num, int>
,它会将数字转换为整数(例如,通过调用.toInt()
它们然后添加 42,因为这显然很有用!)。如果您想在某个需要 的地方使用该变压器StreamTransformer<int, num>
,那么您不能。由于num
不是 的子类型int
,因此变压器不能分配给该类型。
但您知道,因为您了解流转换器的实际工作原理,所以第一个类型参数仅用于输入。接受 any 的东西num
应该在只给定 s 的地方安全地可用int
。因此,为了让类型系统相信您知道自己在做什么,您可以这样写:
StreamTransformer<int, num> transform = myTranformer.cast<int, num>();
Run Code Online (Sandbox Code Playgroud)
现在,tranformer
接受任何整数 ( RS
),检查它是否是num
( S
),将其传递给myTransformer
调用toInt()
并添加 42,然后将结果int
( T
) 传回并transformer
检查它是否是num
( RT
) 并发出它。
一切正常,类型系统也很满意。
您可以用来cast
做一些在运行时永远不会工作的事情,因为它所做的只是添加额外的运行时检查,让静态类型系统相信事情要么成功,要么抛出这些检查。
获得实现的最简单方法StreamTransformer.cast
是使用StreamTransformer.castFrom
静态方法:
StreamTransformer<RS, RT> cast<RS, RT>() => StreamTransformer.castFrom(this);
Run Code Online (Sandbox Code Playgroud)
这将在您自己的变压器上使用系统的默认转换包装器。
归档时间: |
|
查看次数: |
1026 次 |
最近记录: |