使用async/await,RX和LINQ进行异步消息处理

Juh*_*uha 2 .net c# linq system.reactive async-await

我正在将Reactive Extensions与async/await结合使用,以简化我的套接字协议实现.当特定消息到达时必须执行一些操作(例如,将'pong'发送到每个'ping'消息),并且还有一些方法需要异步等待某些特定响应.以下示例说明了这一点:

private Subject<string> MessageReceived = new Subject<string>();

//this method gets called every time a message is received from socket
internal void OnReceiveMessage(string message)
{
    MessageReceived.OnNext(message);
    ProcessMessage(message);
}

public async Task<string> TestMethod()
{
    var expectedMessage = MessageReceived.Where(x => x.EndsWith("D") && x.EndsWith("F")).FirstOrDefaultAsync();
    await SendMessage("ABC");

    //some code...

    //if response we are waiting for comes before next row, we miss it
    return await expectedMessage;
}
Run Code Online (Sandbox Code Playgroud)

TestMethod()将"ABC"发送到套接字并在接收到例如"DEF"时继续(在此之前可能还有一些其他消息).

这几乎可以,但有一个竞争条件.似乎这段代码不会监听消息,直到return await expectedMessage;这是一个问题,因为有时消息会在此之前到达.

Shl*_*omo 5

FirstOrDefaultAsync在这里不会很好地工作:它不会订阅直到await线,这会让你有一个竞争条件(正如你所指出).以下是如何替换它:

    var expectedMessage = MessageReceived
        .Where(x => x.EndsWith("D") && x.EndsWith("F"))
        .Take(1)
        .Replay(1)
        .RefCount();

    using (var dummySubscription = expectedMessage.Subscribe(i => {}))
    {
        await SendMessage("ABC");

        //Some code... goes here.

        return await expectedMessage;
    }
Run Code Online (Sandbox Code Playgroud)

.Replay(1)确保新订阅获得最新条目(假设存在一个).它只适用于有用户收听的情况dummySubscription.