我正在构建需要以发布/订阅方式向所有订阅的消费者发送事件的服务,例如。向所有当前连接的客户端发送一个事件。
我使用 Protobuf 来实现以下原型定义:
service EventsService {
rpc ListenForEvents (AgentProcess) returns (stream Event) {}
}
Run Code Online (Sandbox Code Playgroud)
服务器和客户端都是用 Go 编写的。
我的问题是,当客户端启动连接时,流的寿命不长,例如。当服务器从ListenForEvents方法返回时:
service EventsService {
rpc ListenForEvents (AgentProcess) returns (stream Event) {}
}
Run Code Online (Sandbox Code Playgroud)
然后客户端几乎立即收到EOF错误,这意味着服务器可能关闭了连接。
怎么做才能让客户端长期订阅服务器呢?主要问题是,当客户端调用服务器上的方法时,我可能没有任何内容可以发送给客户端ListenForEvents,这就是为什么我希望该流长期存在,以便以后能够发送消息。
当您从服务器函数返回时,流终止。相反,您应该以某种方式接收事件,并将它们发送到客户端而不从服务器返回。可能有很多方法可以做到这一点。下面是一种方法的草图。
这依赖于在单独的 goroutine 上运行的服务器连接。有一个 Broadcast() 函数,它将向所有连接的客户端发送消息。它看起来像这样:
var allRegisteredClients map[*pb.AgentProcess]chan Message
var clientsLock sync.RWMutex{}
func Broadcast(msg Message) {
clientsLock.RLock()
for _,x:=range allRegisteredClients {
x<-msg
}
clientsLock.RUnlock()
}
Run Code Online (Sandbox Code Playgroud)
然后,您的客户必须自行注册并处理消息:
func (e EventsService) ListenForEvents(process *pb.AgentProcess, listener pb.EventsService_ListenForEventsServer) error {
clientsLock.Lock()
ch:=make(chan Message)
allRegisteredClients[process]=ch
clientsLock.Unlock()
for msg:=range ch {
// send message
// Deal with errors
// Deal with client terminations
}
clientsLock.Lock()
delete(allRegisteredClients,process)
clientsLock.Unlock()
}
Run Code Online (Sandbox Code Playgroud)
正如我所说,这只是这个想法的一个草图。
| 归档时间: |
|
| 查看次数: |
4477 次 |
| 最近记录: |