Cod*_*ero 13 c# system.reactive
如何清除ReplaySubject上的缓冲区?
我需要定期清除缓冲区(在我的情况下作为日常事件的结束)以防止ReplaySubject不断增长并最终占用所有内存.
理想情况下,我希望保持相同的ReplaySubject,因为客户端订阅仍然很好.
谢谢.
Jam*_*rld 11
ReplaySubject 不提供清除缓冲区的方法,但有几种重载以不同方式约束其缓冲区:
TimeSpan保留项目的最大值这是一个非常有趣的问题 - 我决定看看实现ReplaySubject你可以清除的变体是多么容易- 使用现有的主题和操作符(因为它们非常强大).事实证明它相当简单.
我通过内存分析器运行它来检查它是否正确.调用Clear()刷新缓冲区,否则它就像常规的无限制一样ReplaySubject:
public class RollingReplaySubject<T> : ISubject<T>
{
private readonly ReplaySubject<IObservable<T>> _subjects;
private readonly IObservable<T> _concatenatedSubjects;
private ISubject<T> _currentSubject;
public RollingReplaySubject()
{
_subjects = new ReplaySubject<IObservable<T>>(1);
_concatenatedSubjects = _subjects.Concat();
_currentSubject = new ReplaySubject<T>();
_subjects.OnNext(_currentSubject);
}
public void Clear()
{
_currentSubject.OnCompleted();
_currentSubject = new ReplaySubject<T>();
_subjects.OnNext(_currentSubject);
}
public void OnNext(T value)
{
_currentSubject.OnNext(value);
}
public void OnError(Exception error)
{
_currentSubject.OnError(error);
}
public void OnCompleted()
{
_currentSubject.OnCompleted();
_subjects.OnCompleted();
// a quick way to make the current ReplaySubject unreachable
// except to in-flight observers, and not hold up collection
_currentSubject = new Subject<T>();
}
public IDisposable Subscribe(IObserver<T> observer)
{
return _concatenatedSubjects.Subscribe(observer);
}
}
Run Code Online (Sandbox Code Playgroud)
尊重通常的规则(与任何一样Subject)并且不要同时调用此类的方法 - 包括Clear().如果需要,您可以简单地添加同步锁.
它通过在主ReplaySubject中嵌套一系列ReplaySubject来工作.外部ReplaySubject(_subjects)拥有一个内部ReplaySubject(_currentSubject)的缓冲区,并在构造时填充.
这些OnXXX方法调用_currentSubjectReplaySubject.
观察者订阅了嵌套ReplaySubjects(保留在_concatenatedSubjects)的连接投影.由于缓冲区大小_subjects仅为1,因此新订户仅获取最近的事件ReplaySubject.
每当我们需要"清除缓冲区"时,现有的_currentSubject是OnCompleted一个新的ReplaySubject被添加到_subjects并成为新的_currentSubject.
按照@ Brandon的建议,我创建了一个版本RollingReplaySubject,使用一个TimeSpan或一个输入流来表示缓冲区清除.我在这里为此创建了一个要点:https://gist.github.com/james-world/c46f09f32e2d4f338b07
| 归档时间: |
|
| 查看次数: |
6468 次 |
| 最近记录: |