使用异步模式(queue.BeginReceive,queue.EndReceive)使用Reactive扩展(Rx)进行MSMQ消息接收

Jsi*_*inh 10 reactive-programming system.reactive

我一直在使用Rx一段时间用于我的项目上的事件,并专门用于Socket编程,其中很好的部分是它做得很好.管理我的代码,性能优势,更好地执行和解释.

最近我必须修改我的项目的流程,我需要将所有传入的数据(从套接字操作)转储到队列中(使用MSMQ实现作为排队决定).

由于MSMQ提供异步调用以从队列中取消消息(但是以一种奇怪的模式).我现在一直在努力将Rx用于此目的,但是能够这样做.

问题:有人可以给我一个干净的代码示例来实现Rx,以便使用异步模式从队列接收消息.

我需要MSMQ的异步运算符实现类似于这样的东西

var data = Observable.FromAsyncPattern<byte[]>(
                        this.receiverSocket.BeginReceive,
                        this.receiverSocket.EndReceive(some parameters);
Run Code Online (Sandbox Code Playgroud)

提前致谢.*欢呼*到Rx和.NET

Ank*_*kur 4

它会很简单:

var queue = new System.Messaging.MessageQueue("test");
var fun = Observable.FromAsyncPattern((cb, obj) => queue.BeginReceive(TimeSpan.FromMinutes(10),obj,cb), a => queue.EndReceive(a));
var obs = fun();
Run Code Online (Sandbox Code Playgroud)

  • @Micky 那么现在类似的解决方案是什么? (3认同)