订购反应性扩展事件

ie1*_*ie1 3 udp reactive-programming system.reactive

我在多个线程中接收UDP上的消息.每次接待后我都会加注MessageReceived.OnNext(message).

因为我使用多个线程,所以无序引发的消息是一个问题.

如何通过消息计数器命令加注消息?(假设有一个message.counter属性)

必须记住一条消息可能会在通信中丢失(假设我们在X消息之后有一个计数器孔,那个洞没有填满我提出下一条消息)

必须尽快提出消息(如果收到下一个计数器)

Jam*_*rld 6

在说明检测丢失消息的要求时,您没有考虑到最后一条消息未到达的可能性; 我已经添加了timeoutDuration如果没有在规定的时间到达时刷新缓冲的消息-你可能要认为这是一个错误,而不是,请参阅如何做到这一点的意见.

我将通过使用以下签名定义扩展方法来解决此问题:

public static IObservable<TSource> Sort<TSource>(
    this IObservable<TSource> source,
    Func<TSource, int> keySelector,
    TimeSpan timeoutDuration = new TimeSpan(),
    int gapTolerance = 0)
Run Code Online (Sandbox Code Playgroud)
  • source 是未排序的消息流
  • keySelector是一个int从消息中提取密钥的函数.我假设第一把关键是0; 必要时修改.
  • timeoutDuration 如上所述,如果省略,则没有超时
  • tolerance是等待无序消息时保留的最大消息数.传递0保留任意数量的消息
  • scheduler 是用于超时的调度程序,用于测试目的,如果没有给出,则使用默认值.

演练

我将在这里介绍一个逐行演练.下面重复完整的实施.

分配默认计划程序

首先,如果没有提供默认调度程序,我们必须分配它:

scheduler = scheduler ?? Scheduler.Default;
Run Code Online (Sandbox Code Playgroud)

安排超时

现在,如果请求超时,我们将使用一个副本替换源,OnCompleted如果邮件没有到达,它将直接终止并发送timeoutDuration.

if(timeoutDuration != TimeSpan.Zero)
    source = source.Timeout(
        timeoutDuration,
        Observable.Empty<TSource>(),
        scheduler);
Run Code Online (Sandbox Code Playgroud)

如果您希望发送代码TimeoutException,只需删除第二个参数Timeout- 空流,以选择执行此操作的重载.请注意,我们可以安全地与所有订阅者共享此内容,因此它位于调用之外Observable.Create.

创建订阅处理程序

我们Observable.Create用来构建我们的流.Create每当订阅发生时调用作为参数的lambda函数,并且我们传递调用observer(o).Create返回我们,IObservable<T>所以我们在这里返回.

return Observable.Create<TSource>(o => { ...
Run Code Online (Sandbox Code Playgroud)

初始化一些变量

我们将跟踪下一个预期的键值nextKey,并创建一个SortedDictionary以保持无序消息,直到它们被发送.

int nextKey = 0;  
var buffer = new SortedDictionary<int, TSource>();
Run Code Online (Sandbox Code Playgroud)

订阅源,并处理消息

现在我们可以订阅消息流(可能已应用超时).首先我们介绍OnNext处理程序.下一条消息分配给x:

return source.Subscribe(x => { ...
Run Code Online (Sandbox Code Playgroud)

我们调用keySelector函数从消息中提取密钥:

var key = keySelector(x);
Run Code Online (Sandbox Code Playgroud)

如果消息有一个旧密钥(因为它超出了我们对无序消息的容忍度),我们只是放弃它并完成此消息(您可能希望采取不同的行为):

// drop stale keys
if(key < nextKey) return;
Run Code Online (Sandbox Code Playgroud)

否则,我们可能会有预期的密钥,在这种情况下我们可以增加nextKey发送消息:

if(key == nextKey)
{
    nextKey++;
    o.OnNext(x);                    
}
Run Code Online (Sandbox Code Playgroud)

或者,我们可能有一个无序的未来消息,在这种情况下,我们必须将它添加到我们的缓冲区.如果我们这样做,我们还必须确保我们的缓冲区没有超出我们存储乱序消息的容忍度 - 在这种情况下,我们也将碰到nextKey缓冲区中的第一个键,因为它是一个SortedDictionary方便的下一个最低键:

else if(key > nextKey)
{
    buffer.Add(key, x);
    if(gapTolerance != 0 && buffer.Count > gapTolerance)
        nextKey = buffer.First().Key;
}
Run Code Online (Sandbox Code Playgroud)

现在无论上面的结果如何,我们都需要清空现在准备好的任何键的缓冲区.我们使用辅助方法.请注意,它会进行调整,nextKey因此我们必须小心通过引用传递它.只要密钥相互跟随,我们只需循环缓冲区读取,删除和发送消息,每次递增nextKey:

private static void SendNextConsecutiveKeys<TSource>(
    ref int nextKey,
    IObserver<TSource> observer,
    SortedDictionary<int, TSource> buffer)
{
    TSource x;
    while(buffer.TryGetValue(nextKey, out x))
    {
        buffer.Remove(nextKey);
        nextKey++;
        observer.OnNext(x);                        
    }
}
Run Code Online (Sandbox Code Playgroud)

处理错误

接下来我们提供一个OnError处理程序 - 这将只传递任何错误,包括Timeout异常,如果您选择这样做.

刷新缓冲区

最后,我们必须处理OnCompleted.在这里,我选择清空缓冲区 - 如果无序消息阻止了消息并且从未到达,则这是必要的.这就是我们需要超时的原因:

() => {
    // empty buffer on completion
    foreach(var item in buffer)
        o.OnNext(item.Value);                
    o.OnCompleted();
});
Run Code Online (Sandbox Code Playgroud)

全面实施

这是完整的实现.

public static IObservable<TSource> Sort<TSource>(
    this IObservable<TSource> source,
    Func<TSource, int> keySelector,
    int gapTolerance = 0,
    TimeSpan timeoutDuration = new TimeSpan(),
    IScheduler scheduler = null)
{       
    scheduler = scheduler ?? Scheduler.Default;

    if(timeoutDuration != TimeSpan.Zero)
        source = source.Timeout(
            timeoutDuration,
            Observable.Empty<TSource>(),
            scheduler);

    return Observable.Create<TSource>(o => {
        int nextKey = 0;  
        var buffer = new SortedDictionary<int, TSource>();

        return source.Subscribe(x => {
            var key = keySelector(x);

            // drop stale keys
            if(key < nextKey) return;

            if(key == nextKey)
            {
                nextKey++;
                o.OnNext(x);                    
            }
            else if(key > nextKey)
            {
                buffer.Add(key, x);
                if(gapTolerance != 0 && buffer.Count > gapTolerance)
                    nextKey = buffer.First().Key;
            }
            SendNextConsecutiveKeys(ref nextKey, o, buffer);
        },
        o.OnError,
        () => {
            // empty buffer on completion
            foreach(var item in buffer)
                o.OnNext(item.Value);                
            o.OnCompleted();
        });
    });
}

private static void SendNextConsecutiveKeys<TSource>(
    ref int nextKey,
    IObserver<TSource> observer,
    SortedDictionary<int, TSource> buffer)
{
    TSource x;
    while(buffer.TryGetValue(nextKey, out x))
    {
        buffer.Remove(nextKey);
        nextKey++;
        observer.OnNext(x);                        
    }
}
Run Code Online (Sandbox Code Playgroud)

测试线束

如果您rx-testing在控制台应用程序中包含nuget ,则会为您提供以下测试工具:

public static void Main()
{
    var tests = new Tests();
    tests.Test();
}

public class Tests : ReactiveTest
{
    public void Test()
    {
        var scheduler = new TestScheduler();

        var xs = scheduler.CreateColdObservable(
            OnNext(100, 0),
            OnNext(200, 2),
            OnNext(300, 1),
            OnNext(400, 4),
            OnNext(500, 5),
            OnNext(600, 3),
            OnNext(700, 7),
            OnNext(800, 8),
            OnNext(900, 9),            
            OnNext(1000, 6),
            OnNext(1100, 12),
            OnCompleted(1200, 0));

        //var results = scheduler.CreateObserver<int>();

        xs.Sort(
            keySelector: x => x,
            gapTolerance: 2,
            timeoutDuration: TimeSpan.FromTicks(200),
            scheduler: scheduler).Subscribe(Console.WriteLine);

        scheduler.Start();
    }
}
Run Code Online (Sandbox Code Playgroud)

结束评论

这里有各种有趣的替代方法.我采取这种主要的命令式方法,因为我认为这是最容易遵循的 - 但可能会有一些花哨的分组诡计,你可以用来做到这一点.我知道有一件事一直都是关于Rx的 - 有很多方法可以给猫皮肤!

我对这里的超时想法也不太满意 - 在生产系统中,我想实现一些检查连接的方法,例如心跳或类似的.我没有进入这个,因为显然它将是特定于应用程序的.此外,之前已经在这些主板和其他地方讨论过心跳(例如在我的博客上).