将 ConcurrentQueue<T> 公开为 IObservable<T>?

Jas*_*and 5 queue system.reactive c#-4.0

我想知道是否可以使用队列(特别是 ConcurrentQueue)作为 IObservable 的源?就像是;

  Queue = new ConcurrentQueue<IMessage>();
  var xs = Queue.AsEnumerable().ToObservable();

  xs.Subscribe((IMessage msg) =>
     {
        Console.WriteLine("Msg :" + msg.subject);
     });
Run Code Online (Sandbox Code Playgroud)

我想这确实没有意义,因为没有任何东西被出列。我正在尝试实现一个非阻塞进程,它可以订阅推送给观察者的“消息”,因此使用队列。我确信我应该能够用 RX 做到这一点,但似乎无法理解它!

我对有关如何实施的任何建议感兴趣。谢谢!

Ser*_*hov 3

你是对的,转换一个队列(并发或简单,并不重要)只会枚举它,但不会出队。“真正的”实现是可能的,但更复杂 - 请参阅我在 RX 论坛上提出的类似问题的链接(与 StackOverflow 相比,这仍然是关于 RX 的更好的信息来源):

如何使用RX实现单个worker消费者生产者队列?