Cel*_*Cel 6 .net c# serialization persistence system.reactive
[ 这个问题属于Reactive Extensions(Rx)领域 ]
int nValuesBeforeOutput = 123;
myStream.Buffer(nValuesBeforeOutput).Subscribe(
i => Debug.WriteLine("Something Critical on Every 123rd Value"));
Run Code Online (Sandbox Code Playgroud)
现在我需要序列化和反序列化此订阅的状态,以便下次启动应用程序时,缓冲区计数不会从零开始,而是从应用程序退出之前的缓冲区计数开始.
基于Paul Betts方法,这是一个半概括的实现,在我的初始测试中起作用
int nValuesBeforeOutput = 123;
var myRecordableStream = myStream.Record(serializer);
myRecordableStream.Buffer(nValuesBeforeOutput).ClearRecords(serializer).Subscribe(
i => Debug.WriteLine("Something Critical on Every 123rd Value"));
Run Code Online (Sandbox Code Playgroud)
private static bool _alreadyRecording;
public static IObservable<T> Record<T>(this IObservable<T> input,
IRepositor repositor)
{
IObservable<T> output = input;
List<T> records = null;
if (repositor.Deserialize(ref records))
{
ISubject<T> history = new ReplaySubject<T>();
records.ForEach(history.OnNext);
output = input.Merge(history);
}
if (!_alreadyRecording)
{
_alreadyRecording = true;
input.Subscribe(i => repositor.SerializeAppend(new List<T> {i}));
}
return output;
}
public static IObservable<T> ClearRecords<T>(this IObservable<T> input,
IRepositor repositor)
{
input.Subscribe(i => repositor.Clear());
return input;
}
Run Code Online (Sandbox Code Playgroud)
笔记
_alreadyRecording如果您订阅myRecordableStream多次,则需要_alreadyRecording 是一个静态布尔值,非常难看,并且如果需要并行订阅,则阻止扩展方法在多个地方使用 - 需要重新实现以备将来使用对此没有通用的解决方案,制作一个解决方案是 NonTrivial\xe2\x84\xa2。您可以做的最接近的事情是使 myStream 成为某种可重放的 Observable(即,不是序列化状态,而是序列化 myStream 的状态并重做工作以让您回到原来的位置)。
\n| 归档时间: |
|
| 查看次数: |
458 次 |
| 最近记录: |