触发并忘记 goroutine golang

Ani*_*ish 8 go goroutine fire-and-forget

我编写了一个 API 来进行数据库调用并执行一些业务逻辑。我正在调用一个必须在后台执行某些操作的 goroutine。由于 API 调用不应等待此后台任务完成,因此我在调用 goroutine 后立即返回 200 OK(让我们假设后台任务永远不会给出任何错误。)

我读到,一旦 goroutine 完成其任务,goroutine 将被终止。这种一劳永逸的方式是否能避免 goroutine 泄漏?Goroutine 在执行任务后是否会终止并清理?

func DefaultHandler(w http.ResponseWriter, r *http.Request) {
    // Some DB calls
    // Some business logics
    go func() {
        // some Task taking 5 sec
    }()
    w.WriteHeader(http.StatusOK)
}
Run Code Online (Sandbox Code Playgroud)

小智 6

我建议始终控制你的 goroutine,以避免内存和系统耗尽。如果您收到大量请求,并且开始不受控制地生成 goroutine,那么系统可能很快就会崩溃。

在需要立即返回 200Ok 的情况下,最好的方法是创建一个消息队列,因此服务器只需要在队列中创建一个作业并返回 ok 并忘记。其余的将由消费者异步处理。

生产者(HTTP 服务器)>>> 队列 >>> 消费者

通常,队列是外部资源(RabbitMQ、AWS SQS...),但出于教学目的,您可以使用通道作为消息队列来实现相同的效果。

在示例中,您将看到我们如何创建一个通道来通信 2 个进程。然后我们启动将从通道读取数据的工作进程,然后启动具有将写入通道的处理程序的服务器。

发送curl 请求时尝试调整缓冲区大小和作业时间。

package main

import (
    "fmt"
    "log"
    "net/http"
    "time"
)

/*
$ go run .

curl "http://localhost:8080?user_id=1"
curl "http://localhost:8080?user_id=2"
curl "http://localhost:8080?user_id=3"
curl "http://localhost:8080?user_id=....."

*/

func main() {

    queueSize := 10
    // This is our queue, a channel to communicate processes. Queue size is the number of items that can be stored in the channel
    myJobQueue := make(chan string, queueSize) // Search for 'buffered channels'

    // Starts a worker that will read continuously from our queue
    go myBackgroundWorker(myJobQueue)

    // We start our server with a handler that is receiving the queue to write to it
    if err := http.ListenAndServe("localhost:8080", myAsyncHandler(myJobQueue)); err != nil {
        panic(err)
    }
}

func myAsyncHandler(myJobQueue chan<- string) http.HandlerFunc {
    return func(rw http.ResponseWriter, r *http.Request) {
        // We check that in the query string we have a 'user_id' query param
        if userID := r.URL.Query().Get("user_id"); userID != "" {
            select {
            case myJobQueue <- userID: // We try to put the item into the queue ...
                rw.WriteHeader(http.StatusOK)
                rw.Write([]byte(fmt.Sprintf("queuing user process: %s", userID)))
            default: // If we cannot write to the queue it's because is full!
                rw.WriteHeader(http.StatusInternalServerError)
                rw.Write([]byte(`our internal queue is full, try it later`))
            }
            return
        }
        rw.WriteHeader(http.StatusBadRequest)
        rw.Write([]byte(`missing 'user_id' in query params`))
    }
}

func myBackgroundWorker(myJobQueue <-chan string) {
    const (
        jobDuration = 10 * time.Second // simulation of a heavy background process
    )

    // We continuosly read from our queue and process the queue 1 by 1.
    // In this loop we could spawn more goroutines in a controlled way to paralelize work and increase the read throughput, but i don't want to overcomplicate the example.
    for userID := range myJobQueue {
        // rate limiter here ...
        // go func(u string){
        log.Printf("processing user: %s, started", userID)
        time.Sleep(jobDuration)
        log.Printf("processing user: %s, finisehd", userID)
        // }(userID)
    }
}

Run Code Online (Sandbox Code Playgroud)


icz*_*cza 5

您无需处理“goroutine 清理”,您只需启动 goroutine,当作为 goroutine 启动的函数返回时,它们就会被清理。引用规范:Go 语句:

当函数终止时,它的 goroutine 也会终止。如果函数有任何返回值,它们将在函数完成时被丢弃。

所以你所做的一切都很好。但请注意,您启动的 goroutine 不能使用或假设有关请求 ( r) 和响应编写器 ( w) 的任何内容,您只能在从处理程序返回之前使用它们。

另请注意,您不必编写http.StatusOK,如果您从处理程序返回而不编写任何内容,则假定成功并HTTP 200 OK会自动发回。

请参阅相关/可能的重复:Webhook 进程在另一个 goroutine 上运行