Riy*_*man 5 messaging go zeromq
我有一个在服务器上运行的应用程序,它接收来自电话应用程序的请求,然后在工作服务器之间对请求进行负载平衡。我正在尝试添加超时,以防主服务器上已在出站队列中超时长度的消息从队列中删除。更具体地说,主服务器上的应用程序是用golang编写的,并实现了负载均衡的Paranoid Pirate Pattern。我目前拥有的代码是:
import (
"fmt"
zmq "github.com/pebbe/zmq4"
"time"
)
const (
HEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 1500 * time.Millisecond
MESSAGE_READY = "\001"
MESSAGE_HEARTBEAT = "\002"
)
var (
client *zmq.Socket
backend *zmq.Socket
frontend *zmq.Socket
workerPoller *zmq.Poller
brokerPoller *zmq.Poller
workerQueue []Worker
)
type Worker struct {
Id string
Expire time.Time
}
type RequestWrapper {
RequestToSend Request
}
func NewWorker(id string) Worker {
return Worker{
Id: id,
Expire: time.Now().Add(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS),
}
}
func AddReadyWorker(workers []Worker, worker Worker) []Worker {
fmt.Println(worker.Id, " joined")
for i, w := range workers {
if worker.Id == w.Id {
if i == 0 {
workers = workers[1:]
} else if i == len(workers)-1 {
workers = workers[:i]
} else {
workers = append(workers[:i], workers[i+1:]...)
}
break
}
}
return append(workers, worker)
}
func PurgeInactiveWorkers() {
now := time.Now()
for i, worker := range workerQueue {
if now.Before(worker.Expire) {
workerQueue = workerQueue[i:]
return
}
}
workerQueue = workerQueue[0:0]
}
func LoadBalance() {
// Loop:
heartbeat := time.Tick(HEARTBEAT_INTERVAL)
for {
var sockets []zmq.Polled
// If you have available workers, poll on the both front and backend
// If not poll on backend with infinite timeout
if len(workerQueue) > 0 {
sockets, _ = brokerPoller.Poll(HEARTBEAT_INTERVAL)
} else {
sockets, _ = workerPoller.Poll(HEARTBEAT_INTERVAL)
}
for _, socket := range sockets {
switch socket.Socket {
// backend is a router
case backend:
workerId, _ := backend.Recv(0)
workerQueue = AddReadyWorker(workerQueue, NewWorker(workerId))
clientId, _ := backend.Recv(0)
if clientId != MESSAGE_READY && clientId != MESSAGE_HEARTBEAT {
route, _ := backend.Recv(0)
message, _ := backend.RecvBytes(0)
fmt.Println("Received response")
RouteResponse(route, message)
// frontend.Send(clientId, zmq.SNDMORE)
// frontend.Send("", zmq.SNDMORE)
// frontend.SendBytes(message, 0)
}
// frontend is a dealer
case frontend:
clientId, _ := frontend.Recv(0)
route, _ := frontend.Recv(0)
message, _ := frontend.RecvBytes(0)
backend.Send(workerQueue[0].Id, zmq.SNDMORE)
backend.Send(clientId, zmq.SNDMORE)
backend.Send(route, zmq.SNDMORE)
backend.SendBytes(message, 0)
workerQueue = workerQueue[1:]
}
}
select {
case <-heartbeat:
for _, worker := range workerQueue {
backend.Send(worker.Id, zmq.SNDMORE)
backend.Send(MESSAGE_HEARTBEAT, 0)
}
break
default:
}
PurgeInactiveWorkers()
}
}
Run Code Online (Sandbox Code Playgroud)
如果后端发送了一条消息,但它实际上并没有在一段时间内发送给工作人员,我希望它过期并且永远不会被发送。是否有可以实现此目的的套接字选项?如果没有,我需要怎么做才能做到这一点?
我认为可以在没有套接字选项的情况下执行此操作的两种方法是:
1)让后端将消息包装在包装器中并发送到 golang 队列,而不是通过 zeromq。包装器包含消息“发送”的时间。后端并发地一次从 golang 队列的前端拉取一个,并检查消息是否过期。如果是,则不要发送,如果不是,则发送消息。我可以让后端先将消息添加到 golang 队列,然后在同一代码块中真正将其发送出去。这样,我就不需要锁了。
2) 通过 zeromq 将包装器消息发送到检索器,检索器检查其是否已过期并提前返回。我不喜欢这样,因为它似乎对性能不利。
最后,解决方案是添加一个像@colini和@bazza提到的expires-at属性,并在每次心跳后从队列中删除超时消息。然而,事实证明,这样做并满足应用程序的所有要求比乍一看更困难,因此我最终使用了 RabbitMQ,其 ttl-expires 参数提供了所需的功能。
| 归档时间: |
|
| 查看次数: |
949 次 |
| 最近记录: |