如何让 ZeroMQ 使出站排队但未在设定时间内发送的消息超时?

Riy*_*man 5 messaging go zeromq

我有一个在服务器上运行的应用程序,它接收来自电话应用程序的请求,然后在工作服务器之间对请求进行负载平衡。我正在尝试添加超时,以防主服务器上已在出站队列中超时长度的消息从队列中删除。更具体地说,主服务器上的应用程序是用golang编写的,并实现了负载均衡的Paranoid Pirate Pattern。我目前拥有的代码是:

import (
    "fmt"
    zmq "github.com/pebbe/zmq4"
    "time"
)

const (
    HEARTBEAT_LIVENESS = 3
    HEARTBEAT_INTERVAL = 1500 * time.Millisecond

    MESSAGE_READY     = "\001"
    MESSAGE_HEARTBEAT = "\002"
)

var (
    client *zmq.Socket
    backend *zmq.Socket
    frontend *zmq.Socket
    workerPoller *zmq.Poller
    brokerPoller *zmq.Poller
    workerQueue []Worker
)

type Worker struct {
    Id string
    Expire time.Time
}

type RequestWrapper {
    RequestToSend Request

}

func NewWorker(id string) Worker {
    return Worker{
        Id: id,
        Expire: time.Now().Add(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS),
    }
}

func AddReadyWorker(workers []Worker, worker Worker) []Worker {
    fmt.Println(worker.Id, " joined")
    for i, w := range workers {
        if worker.Id == w.Id {
            if i == 0 {
                workers = workers[1:]
            } else if i == len(workers)-1 {
                workers = workers[:i]
            } else {
                workers = append(workers[:i], workers[i+1:]...)
            }
            break
        }
    }
    return append(workers, worker)
}

func PurgeInactiveWorkers() {
    now := time.Now()
    for i, worker := range workerQueue {
        if now.Before(worker.Expire) {
            workerQueue = workerQueue[i:]
            return
        }
    }

    workerQueue = workerQueue[0:0]
}

func LoadBalance() {
// Loop:
    heartbeat := time.Tick(HEARTBEAT_INTERVAL)
    for {
        var sockets []zmq.Polled

        // If you have available workers, poll on the both front and backend
        // If not poll on backend with infinite timeout
        if len(workerQueue) > 0 {
            sockets, _ = brokerPoller.Poll(HEARTBEAT_INTERVAL)
        } else {
            sockets, _ = workerPoller.Poll(HEARTBEAT_INTERVAL)
        }

        for _, socket := range sockets {
            switch socket.Socket {
                // backend is a router
                case backend:
                    workerId, _ := backend.Recv(0)
                    workerQueue = AddReadyWorker(workerQueue, NewWorker(workerId))
                    clientId, _ := backend.Recv(0)
                    if clientId != MESSAGE_READY && clientId != MESSAGE_HEARTBEAT {
                        route, _ := backend.Recv(0)
                        message, _ := backend.RecvBytes(0)

                        fmt.Println("Received response")
                        RouteResponse(route, message)

                        // frontend.Send(clientId, zmq.SNDMORE)
                        // frontend.Send("", zmq.SNDMORE)
                        // frontend.SendBytes(message, 0)
                    }
                // frontend is a dealer
                case frontend:
                    clientId, _ := frontend.Recv(0)
                    route, _ := frontend.Recv(0)
                    message, _ := frontend.RecvBytes(0)

                    backend.Send(workerQueue[0].Id, zmq.SNDMORE)
                    backend.Send(clientId, zmq.SNDMORE)
                    backend.Send(route, zmq.SNDMORE)
                    backend.SendBytes(message, 0)

                    workerQueue = workerQueue[1:]
            }
        }

        select {
            case <-heartbeat:
                for _, worker := range workerQueue {
                    backend.Send(worker.Id, zmq.SNDMORE)
                    backend.Send(MESSAGE_HEARTBEAT, 0)
                }
                break
            default:
        }

        PurgeInactiveWorkers()
    }
}
Run Code Online (Sandbox Code Playgroud)

如果后端发送了一条消息,但它实际上并没有在一段时间内发送给工作人员,我希望它过期并且永远不会被发送。是否有可以实现此目的的套接字选项?如果没有,我需要怎么做才能做到这一点?

我认为可以在没有套接字选项的情况下执行此操作的两种方法是:

1)让后端将消息包装在包装器中并发送到 golang 队列,而不是通过 zeromq。包装器包含消息“发送”的时间。后端并发地一次从 golang 队列的前端拉取一个,并检查消息是否过期。如果是,则不要发送,如果不是,则发送消息。我可以让后端先将消息添加到 golang 队列,然后在同一代码块中真正将其发送出去。这样,我就不需要锁了。

2) 通过 zeromq 将包装器消息发送到检索器,检索器检查其是否已过期并提前返回。我不喜欢这样,因为它似乎对性能不利。

Riy*_*man 1

最后,解决方案是添加一个像@colini和@bazza提到的expires-at属性,并在每次心跳后从队列中删除超时消息。然而,事实证明,这样做并满足应用程序的所有要求比乍一看更困难,因此我最终使用了 RabbitMQ,其 ttl-expires 参数提供了所需的功能。