如何清除ReplaySubject上的缓冲区?

Cod*_*ero 13 c# system.reactive

如何清除ReplaySubject上的缓冲区?

我需要定期清除缓冲区(在我的情况下作为日常事件的结束)以防止ReplaySubject不断增长并最终占用所有内存.

理想情况下,我希望保持相同的ReplaySubject,因为客户端订阅仍然很好.

谢谢.

Jam*_*rld 11

ReplaySubject 不提供清除缓冲区的方法,但有几种重载以不同方式约束其缓冲区:

  • TimeSpan保留项目的最大值
  • 最大项目数
  • 上述的组合,只要满足任一条件就会丢弃项目.

一个可清除的重播主题

这是一个非常有趣的问题 - 我决定看看实现ReplaySubject可以清除的变体是多么容易- 使用现有的主题和操作符(因为它们非常强大).事实证明它相当简单.

我通过内存分析器运行它来检查它是否正确.调用Clear()刷新缓冲区,否则它就像常规的无限制一样ReplaySubject:

public class RollingReplaySubject<T> : ISubject<T>
{
    private readonly ReplaySubject<IObservable<T>> _subjects;
    private readonly IObservable<T> _concatenatedSubjects;
    private ISubject<T> _currentSubject;

    public RollingReplaySubject()
    {
        _subjects = new ReplaySubject<IObservable<T>>(1);
        _concatenatedSubjects = _subjects.Concat();
        _currentSubject = new ReplaySubject<T>();
        _subjects.OnNext(_currentSubject);
    }

    public void Clear()
    {
        _currentSubject.OnCompleted();
        _currentSubject = new ReplaySubject<T>();
        _subjects.OnNext(_currentSubject);
    }

    public void OnNext(T value)
    {
        _currentSubject.OnNext(value);
    }

    public void OnError(Exception error)
    {
        _currentSubject.OnError(error);
    }

    public void OnCompleted()
    {
        _currentSubject.OnCompleted();
        _subjects.OnCompleted();     
        // a quick way to make the current ReplaySubject unreachable
        // except to in-flight observers, and not hold up collection
        _currentSubject = new Subject<T>();       
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _concatenatedSubjects.Subscribe(observer);
    }
}
Run Code Online (Sandbox Code Playgroud)

尊重通常的规则(与任何一样Subject)并且不要同时调用此类的方法 - 包括Clear().如果需要,您可以简单地添加同步锁.

它通过在主ReplaySubject中嵌套一系列ReplaySubject来工作.外部ReplaySubject(_subjects)拥有一个内部ReplaySubject(_currentSubject)的缓冲区,并在构造时填充.

这些OnXXX方法调用_currentSubjectReplaySubject.

观察者订阅了嵌套ReplaySubjects(保留在_concatenatedSubjects)的连接投影.由于缓冲区大小_subjects仅为1,因此新订户仅获取最近的事件ReplaySubject.

每当我们需要"清除缓冲区"时,现有的_currentSubjectOnCompleted一个新的ReplaySubject被添加到_subjects并成为新的_currentSubject.

增强功能

按照@ Brandon的建议,我创建了一个版本RollingReplaySubject,使用一个TimeSpan或一个输入流来表示缓冲区清除.我在这里为此创建了一个要点:https://gist.github.com/james-world/c46f09f32e2d4f338b07

  • 我认为你手头上有太多时间:) 非常好。如果您声明构造函数采用`IObservable&lt;object&gt;` 作为信号,您可以完全消除`TBufferClearing` 通用参数(没有实际用途)以及`Unit` 东西。我怀疑 `On*` 方法也需要由 `gate` 保护,否则如果清除信号触发并且正在清除就像调用 `On*` 一样,就会出现竞争条件......通知可能会丢失。它可能应该实现“IDisposable”,因为它持有对缓冲区清除的订阅。 (2认同)