如何在 Golang GRPC 中为持久流(又名 pubsub)创建服务器

ajt*_*tek 5 streaming go grpc

我正在构建需要以发布/订阅方式向所有订阅的消费者发送事件的服务,例如。向所有当前连接的客户端发送一个事件。

我使用 Protobuf 来实现以下原型定义:

service EventsService {
  rpc ListenForEvents (AgentProcess) returns (stream Event) {}
}
Run Code Online (Sandbox Code Playgroud)

服务器和客户端都是用 Go 编写的。

我的问题是,当客户端启动连接时,流的寿命不长,例如。当服务器从ListenForEvents方法返回时:

service EventsService {
  rpc ListenForEvents (AgentProcess) returns (stream Event) {}
}
Run Code Online (Sandbox Code Playgroud)

然后客户端几乎立即收到EOF错误,这意味着服务器可能关闭了连接。

怎么做才能让客户端长期订阅服务器呢?主要问题是,当客户端调用服务器上的方法时,我可能没有任何内容可以发送给客户端ListenForEvents,这就是为什么我希望该流长期存在,以便以后能够发送消息。

Bur*_*dar 2

当您从服务器函数返回时,流终止。相反,您应该以某种方式接收事件,并将它们发送到客户端而不从服务器返回。可能有很多方法可以做到这一点。下面是一种方法的草图。

这依赖于在单独的 goroutine 上运行的服务器连接。有一个 Broadcast() 函数,它将向所有连接的客户端发送消息。它看起来像这样:

var allRegisteredClients map[*pb.AgentProcess]chan Message
var clientsLock sync.RWMutex{}

func Broadcast(msg Message) {
  clientsLock.RLock()
  for _,x:=range allRegisteredClients {
      x<-msg
  }
  clientsLock.RUnlock()
}
Run Code Online (Sandbox Code Playgroud)

然后,您的客户必须自行注册并处理消息:

func (e EventsService) ListenForEvents(process *pb.AgentProcess, listener pb.EventsService_ListenForEventsServer) error {
   clientsLock.Lock()
   ch:=make(chan Message)
   allRegisteredClients[process]=ch
   clientsLock.Unlock()

   for msg:=range ch {
       // send message
       // Deal with errors
       // Deal with client terminations
   }
   clientsLock.Lock()
   delete(allRegisteredClients,process)
   clientsLock.Unlock()
}
Run Code Online (Sandbox Code Playgroud)

正如我所说,这只是这个想法的一个草图。