通过 RoundTrip 的 Go 速率限制 http.client 超出限制并产生致命恐慌

Kri*_*urg 5 timeout http go rate-limiting

我的目标:设置每分钟 600 个请求的速率限制,并在下一分钟重置。http.client我的目的是通过设置 aRoundTrip和 a来做到这一点limit.wait()。这样我就可以为不同的情况设置不同的限制http.clients(),并通过处理限制roundtrip,而不是在其他地方增加代码的复杂性。

问题是没有遵守速率限制,我仍然超出了允许的请求数量并且设置超时会产生致命的恐慌net/http: request canceled (Client.Timeout exceeded while awaiting headers)

我创建了一个main.go可以复制该问题的准系统。请注意,64000 循环对我来说是一个现实场景。

更新:设置ratelimiter: rate.NewLimiter(10, 10),仍然超出 600 速率限制,并Context deadline exceeded在设置超时时产生错误。

package main

import (
    "fmt"
    "io/ioutil"
    "net/http"
    "sync"
    "time"

    "golang.org/x/time/rate"
)

var client http.Client

// ThrottledTransport Rate Limited HTTP Client
type ThrottledTransport struct {
    roundTripperWrap http.RoundTripper
    ratelimiter      *rate.Limiter
}

func (c *ThrottledTransport) RoundTrip(r *http.Request) (*http.Response, error) {
    err := c.ratelimiter.Wait(r.Context()) // This is a blocking call. Honors the rate limit
    if err != nil {
        return nil, err
    }
    return c.roundTripperWrap.RoundTrip(r)
}

// NewRateLimitedTransport wraps transportWrap with a rate limitter
func NewRateLimitedTransport(transportWrap http.RoundTripper) http.RoundTripper {
    return &ThrottledTransport{
        roundTripperWrap: transportWrap,
        //ratelimiter:      rate.NewLimiter(rate.Every(limitPeriod), requestCount),
        ratelimiter: rate.NewLimiter(10, 10),
    }
}

func main() {
    concurrency := 20
    var ch = make(chan int, concurrency)
    var wg sync.WaitGroup

    wg.Add(concurrency)
    for i := 0; i < concurrency; i++ {
        go func() {
            for {
                a, ok := <-ch
                if !ok { // if there is nothing to do and the channel has been closed then end the goroutine
                    wg.Done()
                    return
                }
                resp, err := client.Get("https://api.guildwars2.com/v2/items/12452")
                if err != nil {
                    fmt.Println(err)
                }
                body, err := ioutil.ReadAll(resp.Body)
                if err != nil {
                    fmt.Println(err)
                }
                fmt.Println(a, ":", string(body[4:29]))
            }
        }()
    }
    client = http.Client{}
    client.Timeout = time.Second * 10

    // Rate limits 600 requests per 60 seconds via RoundTripper
    transport := NewRateLimitedTransport(http.DefaultTransport)
    client.Transport = transport

    for i := 0; i < 64000; i++ {
        ch <- i // add i to the queue
    }

    wg.Wait()
    fmt.Println("done")
}

Run Code Online (Sandbox Code Playgroud)

Zek*_* Lu 3

rate.NewLimiter(rate.Every(60*time.Second), 600)不是你想要的。

根据https://pkg.go.dev/golang.org/x/time/rate#Limiter

限制器控制允许事件发生的频率。它实现了一个大小为 b 的“令牌桶”,最初已满,并以每秒 r 个令牌的速率重新填充。非正式地,在任何足够大的时间间隔中,限制器将速率限制为每秒 r 个令牌,最大突发大小为 b 个事件


func NewLimiter(r Limit, b int) *Limiter

NewLimiter 返回一个新的限制器,允许事件达到速率 r 并允许最多 b 个令牌的突发。


func Every(间隔时间.持续时间) 限制

Every 将事件之间的最小时间间隔转换为限制。

rate.Every(60*time.Second)意味着每 60 秒就会向桶中填充 1 个令牌。即,速率是1/60每秒的令牌数。

大多数情况下,600 requests per minute意味着600一开始就允许请求,并且会立即重置为600下一分钟。在我看来,golang.org/x/time/rate不太适合这个用例。也许rate.NewLimiter(10, 10)是一个安全的选择。