Masstransit Rx集成

Luk*_*Luk 5 masstransit system.reactive

Masstransit文档中提到了Rx集成,但我找不到任何使用它的例子.

我有两个问题 - 如何使用它以及它给出的优点是什么?

Vla*_*nov 7

你需要安装两个nuget-packages:Masstransit.ReactiveSystem.Reactive.Linq.

添加使用:

using MassTransit.Reactive;
using System.Reactive.Linq;
Run Code Online (Sandbox Code Playgroud)

然后使用:

IObservable<IMessage> messages = bus.AsObservable<IMessage>();

messages.Subscribe(
    onNext: msg => Console.WriteLine(msg.MessageId),
    onCompleted: () => Console.WriteLine("completed"),
    onError: ex => Console.WriteLine(ex.Message));
Run Code Online (Sandbox Code Playgroud)

IMessage你的消息实现的界面在哪里?

但是,在使用它时,我注意到以下特点:

  • 只有观察到的答案(没有观察到请求本身)
  • 仅观察通过公交车发布的消息(而不是通过交通工具)
  • 消息是不变的(不允许强制转换为子类型)
  • 没有例外
  • 没有观察到序列的结束(即使总线停止)
  • 处理程序序列中抛出异常,中断观察序列(因此验证观察者无效)

所以我不完全理解如何在实践中应用它.