我为反应式扩展创建了一个SlidingWindow()运算符,因为我希望能够轻松监控滚动平均值等事情.作为一个简单的例子,我想订阅听鼠标事件,但每次有事件我想收到最后三个(而不是等待每三个事件接收最后三个).这就是为什么Window重载我发现似乎没有给我开箱即用的东西.
这就是我提出的.鉴于其频繁的List操作,我担心它可能不是最高性能的解决方案:
public static IObservable<List<T>> SlidingWindow<T>(this IObservable<T> seq, int length)
{
var seed = new List<T>();
Func<List<T>, T, List<T>> accumulator = (list, arg2) =>
{
list.Add(arg2);
if (list.Count > length)
list.RemoveRange(0, (list.Count - length));
return list;
};
return seq.Scan(seed, accumulator)
.Where(list => list.Count == length);
}
Run Code Online (Sandbox Code Playgroud)
它可以这样调用:
var rollingSequence = Observable.Range(1, 5).SlidingWindow().ToEnumerable();
Run Code Online (Sandbox Code Playgroud)
但是,令我惊讶的是,而不是收到预期的结果
1,2,3
2,3,4
3,4,5
Run Code Online (Sandbox Code Playgroud)
我收到了结果
2,3,4
3,4,5
3,4,5
Run Code Online (Sandbox Code Playgroud)
任何见解将不胜感激!
Jam*_*rld 12
使用原始测试,参数为3表示计数,这会得到所需的结果:
public static IObservable<IList<T>> SlidingWindow<T>(
this IObservable<T> source, int count)
{
return source.Buffer(count, 1)
.Where(list => list.Count == count);
}
Run Code Online (Sandbox Code Playgroud)
像这样测试:
var source = Observable.Range(1, 5);
var query = source.SlidingWindow(3);
using (query.Subscribe(i => Console.WriteLine(string.Join(",", i))))
{
}
Run Code Online (Sandbox Code Playgroud)
输出:
1,2,3
2,3,4
3,4,5
Run Code Online (Sandbox Code Playgroud)
这里的滑动窗口实现不足以满足我对滑动窗口的想法。最接近的是 using Buffer(N, 1)but 是一个问题,因为它在发出第一个结果之前等待前 N 个项目,然后滑出序列的末尾。我想要一次发射最多 N 个项目。
我最终得到了这个实现:
public static IObservable<IList<T>> SlidingWindow<T>(this IObservable<T> obs, int windowSize) =>
Observable.Create<IList<T>>(observer =>
{
var buffer = new CircularBuffer<T>(windowSize);
return obs.Subscribe(
value =>
{
buffer.Add(value);
observer.OnNext(buffer.ToList());
},
ex => observer.OnError(ex),
() => observer.OnCompleted()
);
});
Run Code Online (Sandbox Code Playgroud)
我最初使用队列作为缓冲区,但想使用更轻量级的东西。
public class CircularBuffer<T> : IReadOnlyList<T>
{
private readonly T[] buffer;
private int offset;
private int count;
public CircularBuffer(int bufferSize) => this.buffer = new T[bufferSize];
public int Capacity => buffer.Length;
public int Count => count;
public T this[int index] => index < 0 || index >= count
? throw new ArgumentOutOfRangeException(nameof(index))
: buffer[(offset + index) % buffer.Length];
public void Add(T value)
{
buffer[(offset + count) % buffer.Length] = value;
if (count < buffer.Length) count++;
else offset = (offset + 1) % buffer.Length;
}
public IEnumerator<T> GetEnumerator()
{
for (var i = 0; i < count; i++)
yield return buffer[(offset + i) % buffer.Length];
}
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
Run Code Online (Sandbox Code Playgroud)
它将产生以下序列Observable.Range(0, 10).SlidingWindow(3):
0,1,2,3,4,5,6,7,8,9
[0]
[0,1]
[0,1,2]
[1,2,3]
[2,3,4]
[3,4,5]
[4,5,6]
[5,6,7]
[6,7,8]
[7,8,9]
Run Code Online (Sandbox Code Playgroud)
试试这个 - 我不得不坐下来考虑它的相对性能,但它至少 可能一样好,而且更容易阅读:
public static IObservable<IList<T>> SlidingWindow<T>(
this IObservable<T> src,
int windowSize)
{
var feed = src.Publish().RefCount();
// (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list
return Observable.Zip(
Enumerable.Range(0, windowSize)
.Select(skip => feed.Skip(skip))
.ToArray());
}
Run Code Online (Sandbox Code Playgroud)
试验台:
var source = Observable.Range(0, 10);
var query = source.SlidingWindow(3);
using(query.Subscribe(Console.WriteLine))
{
Console.ReadLine();
}
Run Code Online (Sandbox Code Playgroud)
输出:
ListOf(0,1,2)
ListOf(1,2,3)
ListOf(2,3,4)
ListOf(3,4,5)
ListOf(4,5,6)
...
Run Code Online (Sandbox Code Playgroud)
编辑:顺便说.Publish().RefCount()一句,自从被烧毁一次后,我发现自己强迫性地不这样做......我认为这里没有严格要求,tho。
为 yzorg 编辑:
如果您像这样扩充方法,您将更清楚地看到运行时行为:
public static IObservable<IList<T>> SlidingWindow<T>(
this IObservable<T> src,
int windowSize)
{
var feed = src.Publish().RefCount();
// (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list
return Observable.Zip(
Enumerable.Range(0, windowSize)
.Select(skip =>
{
Console.WriteLine("Skipping {0} els", skip);
return feed.Skip(skip);
})
.ToArray());
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2184 次 |
| 最近记录: |