我想使用 Rx 缓冲区功能:
var source = new Subject<Price>();
var buffer = source
.Buffer(TimeSpan.FromSeconds(30), 5)
.Where(p => p.Any());
Run Code Online (Sandbox Code Playgroud)
这意味着当缓冲区达到 5 或 30 秒后,自上次发出以来已经过去时,就会发出(发布给订阅者)。
但是我需要能够按需发送 - 例如当我收到高优先级序列项目时。然后我想将它添加到 observable ( source.OnNext()) 并以某种方式强制它发出(这意味着返回缓冲区中的所有元素并清除它)。
我知道我可以添加以下代码:
var flusher = new Subject<Price>();
var closing = flusher.Select(x => new List<Price> {x});
var query = buffer.Merge(closing).Subscribe(something);
Run Code Online (Sandbox Code Playgroud)
并调用flusher.OnNext(highPriorityItem),我会发出它。
但在这种情况下,我有两个独立的序列,有两个不同的发射。当缓冲区已满或特定项目按顺序出现时,我需要发出一次。
Force flush count-type Observable.Buffer c#和Force flush to Observable.Buffer c#似乎不适合我