使用Reactive Extensions观察传入的websocket消息?

moo*_*nux 4 c# linq websocket system.reactive

我想使用linq来处理通过websocket连接收到的事件.这是我到目前为止:

    private static void Main()
    {
        string WsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";
        using (WebSocket ws = new WebSocket(WsEndpoint))
        {
            ws.OnMessage += Ws_OnMessage;

            ws.Connect();
            Console.ReadKey();
            ws.Close();
        }
    }

    private static void Ws_OnMessage(object sender, MessageEventArgs e)
    {
        Console.WriteLine(e.Data);
    }
Run Code Online (Sandbox Code Playgroud)

首先想到的是如何ws.OnMessage变成某种事件流.我无法在线找到任何带有反应式扩展的外部事件源的示例.我打算将消息解析为json对象,然后过滤和聚合它们.

有人可以提供一个从websocket消息创建一个observable并订阅它的例子吗?


编辑:最终工作代码

与所选答案的唯一区别在于我在传递之前初始化了websocket Observable.Using

//-------------------------------------------------------
// Create websocket connection
//-------------------------------------------------------
const string wsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";
WebSocket socket = new WebSocket(wsEndpoint);


//-------------------------------------------------------
// Create an observable by wrapping ws.OnMessage
//-------------------------------------------------------
var globalEventStream = Observable
    .Using(
        () => socket,
        ws =>
            Observable
                .FromEventPattern<EventHandler<MessageEventArgs>, MessageEventArgs>(
                    handler => ws.OnMessage += handler,
                    handler => ws.OnMessage -= handler));
//---------------------------------------------------------
// Subscribe to globalEventStream
//---------------------------------------------------------

IDisposable subscription = globalEventStream.Subscribe(ep =>
{
    Console.WriteLine("Event Recieved");
    Console.WriteLine(ep.EventArgs.Data);
});

//----------------------------------------------------------
// Send message over websocket
//----------------------------------------------------------
socket.Connect();
socket.Send("test message");
// When finished, close the connection.
socket.Close();
Run Code Online (Sandbox Code Playgroud)

Eni*_*ity 6

您应该像这样设置您的observable:

    var observable =
        Observable
            .Using(
                () => new WebSocket(WsEndpoint),
                ws =>
                    Observable
                        .FromEventPattern<EventHandler<MessageEventArgs>, MessageEventArgs>(
                            handler => ws.OnMessage += handler,
                            handler => ws.OnMessage -= handler));
Run Code Online (Sandbox Code Playgroud)

这将正确创建套接字,然后在订阅observable时观察事件.订阅处理完毕后,它将正确地从事件中分离出来并丢弃套接字.


类型observable将是IObservable<EventPattern<MessageEventArgs>>.您以这种方式使用此observable:

IDisposable subscription = observable.Subscribe(ep =>
{
    Console.WriteLine(ep.EventArgs.Data);
});
Run Code Online (Sandbox Code Playgroud)

感谢您发布NuGet参考.

这是工作代码:

const string WsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";

Console.WriteLine("Defining Observable:");

IObservable<EventPattern<WebSocketSharp.MessageEventArgs>> observable =
    Observable
        .Using(
            () =>
            {
                var ws = new WebSocketSharp.WebSocket(WsEndpoint);
                ws.Connect();
                return ws;
            },
            ws =>
                Observable
                    .FromEventPattern<EventHandler<WebSocketSharp.MessageEventArgs>, WebSocketSharp.MessageEventArgs>(
                        handler => ws.OnMessage += handler,
                        handler => ws.OnMessage -= handler));

Console.WriteLine("Subscribing to Observable:");

IDisposable subscription = observable.Subscribe(ep =>
{
    Console.WriteLine("Event Recieved");
    Console.WriteLine(ep.EventArgs.Data);
});

Console.WriteLine("Writing to Source:");

using (var source = new WebSocketSharp.WebSocket(WsEndpoint))
{
    source.Connect();
    source.Send("test");
}
Run Code Online (Sandbox Code Playgroud)

  • 当您处置observable的订阅时,套接字也会被释放. (3认同)