奥尔良grain客户端和grain之间的双向通信

Mar*_*rek 3 orleans

我想在grain方法中实现对grain客户端的回调,例如。在评估 Grain.Method1 期间,需要调用客户端方法来获取一些数据。

我尝试使用流来执行此操作,但是当我在客户端上订阅流时,该方法不会触发。

粮食:

var config = ClusterConfiguration.LocalhostPrimarySilo();
config.AddMemoryStorageProvider();
config.Globals.RegisterStorageProvider<MemoryStorage>("PubSubStore");
config.Globals.RegisterStreamProvider<SimpleMessageStreamProvider>("MySMSProvider");
...
public override async Task OnActivateAsync() {
var streamProvider  = GetStreamProvider("MySMSProvider");
var stream = streamProvider.GetStream<MyTypeMessage>(myGuid, "MyStream");

RegisterTimer(s => {
   return stream.OnNextAsync(new MyTypeMessage());
}, null, TimeSpan.FromMilliseconds(1000), TimeSpan.FromMilliseconds(1000));
...
Run Code Online (Sandbox Code Playgroud)

客户:

var clientConfiguration = ClientConfiguration.LocalhostSilo();
clientConfiguration.RegisterStreamProvider<SimpleMessageStreamProvider>("MySMSProvider");
GrainClient.Initialize(clientConfiguration);
...
var streamProvider = GrainClient.GetStreamProvider("MySMSProvider");
var stream = requestStreamProvider.GetStream<MyTypeMessage>(myGuid, "MyStream");
await stream.SubscribeAsync(
            async (message, token) => { process message that does not fire }); 
Run Code Online (Sandbox Code Playgroud)

Ale*_*bak 5

看来您没有保留StreamSubscriptionHandle<MyTypeMessage>订阅返回的引用

尝试这样的事情: var subscriptionHandle = await _factCalculationRequestStream .SubscribeAsync(async (message, token) => { process message that does not fire }); 并防止subscriptionHandle被垃圾收集

就我而言,我最终IGrainObserver在我的类上实现(这是 WebAPI 项目中的 websocketBehaviour,并且此订阅效果很好:

public class MySocketService: WebSocketBehavior, IGrainObserver, IDisposable
{
  private StreamSubscriptionHandle<MyStreamEvent>
  ...

  //inside some method invoked externally
  _streamSubscriberHandle = await requestedStream
     .SubscribeAsync(onStreamMessage, onStreamError, onStreamComplete);
}
Run Code Online (Sandbox Code Playgroud)