我正在开发一个使用net/http包调用客户端 URL 的 API 。基于用户国家/地区,goroutine 中的每个请求(POST 调用)同时调用 1 到 8 个 URL。该应用程序可以处理大约 1000-1500 个请求的低 qps,但是将应用程序扩展到 3k 请求,即使只有 1 个客户端 URL 被称为应用程序在几分钟后停止响应,内存也会突然增加(响应时间远高于 50 秒) )。我正在使用 Go 本机net/http包和gorilla/mux路由器。关于这个问题的其他问题说要关闭响应主体,但我已经使用
req, err := http.NewRequest("POST", "client_post_url", bytes.NewBuffer(requestBody))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Connection", "Keep-Alive")
response, err := common.Client.Do(req)
status := 0
if err != nil {//handle and return}
defer response.Body.Close() //used with/without io.Copy
status = response.StatusCode
body, _ := ioutil.ReadAll(response.Body)
_, err = io.Copy(ioutil.Discard, response.Body)
Run Code Online (Sandbox Code Playgroud)
我需要重用连接,因此我已经在这样的 init 方法中初始化了 http 客户端和传输全局变量。
common.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
DialContext: (&net.Dialer{
//Timeout: time.Duration(300) * time.Millisecond,
KeepAlive: 30 * time.Second,
}).DialContext,
//ForceAttemptHTTP2: true,
DisableKeepAlives: false,
//MaxIdleConns: 0,
//IdleConnTimeout: 0,
//TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,
//ExpectContinueTimeout: 1 * time.Second,
}
common.Client = &http.Client{
Timeout: time.Duration(300) * time.Millisecond,
Transport: common.Transport,
}
Run Code Online (Sandbox Code Playgroud)
我读过使用保持活动会导致内存泄漏,我尝试了一些组合来根据请求禁用保持活动/关闭请求标志。但似乎没有任何效果。此外,如果我不进行任何 http 调用并time.Sleep(300 * time.Millisecond)在 goroutine 中使用并发调用每个 url,应用程序确实可以正常工作而不会出现任何泄漏。所以我确定这与高负载连接下的客户端/http包没有释放或未正确使用有关。
我应该采取什么方法来实现这一目标?创建自定义服务器和自定义处理程序类型来接受请求和路由请求是否会像几篇文章中 C10K 方法中提到的那样工作?如果需要,我可以与所有详细信息共享示例代码。上面只是补充了我觉得问题所在的部分。
这是一个有代表性的代码
main.go
package main
import (
"./common"
"bytes"
"crypto/tls"
"fmt"
"github.com/gorilla/mux"
"io"
"io/ioutil"
"log"
"math/rand"
"net"
"net/http"
"net/http/pprof"
"os"
"runtime"
"strconv"
"sync"
"time"
)
func init() {
//Get Any command line argument passed
args := os.Args[1:]
numCPU := runtime.NumCPU()
if len(args) > 1 {
numCPU, _ = strconv.Atoi(args[0])
}
common.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
DialContext: (&net.Dialer{
//Timeout: time.Duration() * time.Millisecond,
KeepAlive: 30 * time.Second,
}).DialContext,
//ForceAttemptHTTP2: true,
DisableKeepAlives: false,
//MaxIdleConns: 0,
//IdleConnTimeout: 0,
//TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,
//ExpectContinueTimeout: 1 * time.Second,
}
common.Client = &http.Client{
Timeout: time.Duration(300) * time.Millisecond,
Transport: common.Transport,
}
runtime.GOMAXPROCS(numCPU)
rand.Seed(time.Now().UTC().UnixNano())
}
func main() {
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
_, _ = fmt.Fprintf(w, "Hello!!!")
})
router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
prepareRequest(w, r, vars["name"])
}).Methods("POST")
// Register pprof handlers
router.HandleFunc("/debug/pprof/", pprof.Index)
router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
router.HandleFunc("/debug/pprof/profile", pprof.Profile)
router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
router.HandleFunc("/debug/pprof/trace", pprof.Trace)
routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout")
srv := &http.Server{
Addr: "0.0.0.0:" + "80",
/*ReadTimeout: 500 * time.Millisecond,
WriteTimeout: 500 * time.Millisecond,
IdleTimeout: 10 * time.Second,*/
Handler: routerMiddleWare,
}
log.Fatal(srv.ListenAndServe())
}
func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {
//other part of the code and call to goroutine
var urls []string
results, s, c := callUrls(urls)
finalCall(w, results, s, c)
}
type Response struct {
Status int
Url string
Body string
}
func callUrls(urls []string) ([]*Response, []string, []string) {
var wg sync.WaitGroup
wg.Add(len(urls))
ch := make(chan func() (*Response, string, string), len(urls))
for _, url := range urls {
go func(url string) {
//decide if request is valid for client to make http call using country/os
isValid := true //assuming url to be called
if isValid {
//make post call
//request body have many more paramter, just sample included.
//instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak.
req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(`{"body":"param"}`)))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Connection", "Keep-Alive")
//req.Close = true
response, err := common.Client.Do(req)
if err != nil {
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 500, Url: url, Body: ""}, "error", "500"
}
return
}
defer response.Body.Close()
body, _ := ioutil.ReadAll(response.Body)
_, err = io.Copy(ioutil.Discard, response.Body)
//Close the body, forced this
//Also tried without defer, and only wothout following line
response.Body.Close()
//do something with response body replace a few string etc.
//and return
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200"
}
} else {
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500"
}
}
}(url)
}
wg.Wait()
var (
results []*Response
msg []string
status []string
)
for {
r, x, y := (<-ch)()
if r != nil {
results = append(results, r)
msg = append(msg, x)
status = append(status, y)
}
if len(results) == len(urls) {
return results, msg, status
}
}
}
func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string){
fmt.Println("response", "response body", results, msg, status)
}
Run Code Online (Sandbox Code Playgroud)
vars.go
package common
import (
"net/http"
)
var (
//http client
Client *http.Client
//http Transport
Transport *http.Transport
)
Run Code Online (Sandbox Code Playgroud)
pprof:具有 4 个客户端 url 的配置文件,平均约为 2500qps。
不调用客户端 url,通过保持isValid = false并且time.Sleep(300* time.Millisecond)不会发生泄漏。

这段代码没有泄漏。
为了演示,让我们稍微更新一下 ** 以便帖子可重现。
main.go
package main
import (
"bytes"
"crypto/tls"
_ "expvar"
"fmt"
"io"
"io/ioutil"
"log"
"math/rand"
"net"
"net/http"
_ "net/http/pprof"
"os"
"runtime"
"strconv"
"sync"
"time"
"github.com/gorilla/mux"
)
var (
//http client
Client *http.Client
//http Transport
Transport *http.Transport
)
func init() {
go http.ListenAndServe("localhost:6060", nil)
//Get Any command line argument passed
args := os.Args[1:]
numCPU := runtime.NumCPU()
if len(args) > 1 {
numCPU, _ = strconv.Atoi(args[0])
}
Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
DialContext: (&net.Dialer{
//Timeout: time.Duration() * time.Millisecond,
KeepAlive: 30 * time.Second,
}).DialContext,
//ForceAttemptHTTP2: true,
DisableKeepAlives: false,
//MaxIdleConns: 0,
//IdleConnTimeout: 0,
//TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,
//ExpectContinueTimeout: 1 * time.Second,
}
Client = &http.Client{
// Timeout: time.Duration(300) * time.Millisecond,
Transport: Transport,
}
runtime.GOMAXPROCS(numCPU)
rand.Seed(time.Now().UTC().UnixNano())
}
func main() {
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
_, _ = fmt.Fprintf(w, "Hello!!!")
})
router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
prepareRequest(w, r, vars["name"])
}).Methods("POST", "GET")
// Register pprof handlers
// router.HandleFunc("/debug/pprof/", pprof.Index)
// router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
// router.HandleFunc("/debug/pprof/profile", pprof.Profile)
// router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
// router.HandleFunc("/debug/pprof/trace", pprof.Trace)
routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout")
srv := &http.Server{
Addr: "localhost:8080",
/*ReadTimeout: 500 * time.Millisecond,
WriteTimeout: 500 * time.Millisecond,
IdleTimeout: 10 * time.Second,*/
Handler: routerMiddleWare,
}
log.Fatal(srv.ListenAndServe())
}
func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {
// go func() {
// make(chan []byte) <- make([]byte, 10024)
// }()
//other part of the code and call to goroutine
var urls []string
urls = append(urls,
"http://localhost:7000/",
"http://localhost:7000/",
)
results, s, c := callUrls(urls)
finalCall(w, results, s, c)
}
type Response struct {
Status int
Url string
Body string
}
func callUrls(urls []string) ([]*Response, []string, []string) {
var wg sync.WaitGroup
wg.Add(len(urls))
ch := make(chan func() (*Response, string, string), len(urls))
for _, url := range urls {
go func(url string) {
//decide if request is valid for client to make http call using country/os
isValid := true //assuming url to be called
if isValid {
//make post call
//request body have many more paramter, just sample included.
//instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak.
req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(`{"body":"param"}`)))
if err != nil {
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500"
}
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Connection", "Keep-Alive")
//req.Close = true
response, err := Client.Do(req)
if err != nil {
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 500, Url: url, Body: ""}, err.Error(), "500"
}
return
}
defer response.Body.Close()
body, _ := ioutil.ReadAll(response.Body)
io.Copy(ioutil.Discard, response.Body)
//Close the body, forced this
//Also tried without defer, and only wothout following line
response.Body.Close()
//do something with response body replace a few string etc.
//and return
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200"
}
} else {
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500"
}
}
}(url)
}
wg.Wait()
var (
results []*Response
msg []string
status []string
)
for {
r, x, y := (<-ch)()
if r != nil {
results = append(results, r)
msg = append(msg, x)
status = append(status, y)
}
if len(results) == len(urls) {
return results, msg, status
}
}
}
func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) {
fmt.Println("response", "response body", results, msg, status)
}
Run Code Online (Sandbox Code Playgroud)
k/main.go
package main
import "net/http"
func main() {
y := make([]byte, 100)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write(y)
})
http.ListenAndServe(":7000", nil)
}
Run Code Online (Sandbox Code Playgroud)
安装额外的可视化工具,并用于ab模拟一些负载,它将完成直观演示的工作。
go get -u github.com/divan/expvarmon
go run main.go &
go run k/main.go &
ab -n 50000 -c 2500 http://localhost:8080/y
# in a different window, for live preview
expvarmon -ports=6060 -i 500ms
Run Code Online (Sandbox Code Playgroud)
那时你会阅读 的输出expvarmon,如果它是实时的,你会有类似的东西
你可以看到东西在摇晃,gc 正在积极工作。
应用程序已加载,内存正在被消耗,等待服务器释放其 conn 和 gc 来清理它们
你可以看到memstats.Alloc, memstats.HeapAlloc,memstats.HeapInuse现在减少了,正如预期的那样,当 gc 完成他的工作并且不存在泄漏时。
如果你要检查go tool pprof -inuse_space -web http://localhost:6060/debug/pprof/heap,就在ab跑步之后
它表明该应用程序正在使用177Mb内存。
其中大部分102Mb正在被net/http.Transport.getConn.
您的处理程序负责1Mb,其余的是各种需要的东西。
如果您在服务器和 gc 发布后截取屏幕截图,您会看到一个更小的图表。这里不演示。
现在让我们生成一个泄漏并再次使用这两种工具查看它。
在代码中取消注释,
func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {
go func() {
make(chan []byte) <- make([]byte, 10024)
}()
//...
Run Code Online (Sandbox Code Playgroud)
重新启动应用程序(按qin expvarmon,虽然不是必需的)
go get -u github.com/divan/expvarmon
go run main.go &
go run k/main.go &
ab -n 50000 -c 2500 http://localhost:8080/y
# in a different window, for live preview
expvarmon -ports=6060 -i 500ms
Run Code Online (Sandbox Code Playgroud)
表明
在expvarmon你可以看到同样的行为,只是数字发生了变化,而在rest状态下,经过gced之后,仍然消耗了大量内存,比一个void golang http server拿个对比点要多很多。
再次对堆进行截图,它表明您的处理程序现在正在消耗大部分内存 ~ 450Mb,注意箭头,它表明有 for 452mbof 10kballocations 和4.50Mbof 96b。它们分别对应于[]byte被推送到chan []byte.
最后,您可以检查堆栈跟踪以查找死的 goroutines,从而泄漏内存,打开http://localhost:6060/debug/pprof/goroutine?debug=1
goroutine profile: total 50012
50000 @ 0x43098f 0x4077fa 0x4077d0 0x4074bb 0x76b85d 0x45d281
# 0x76b85c main.prepareRequest.func1+0x4c /home/mh-cbon/gow/src/test/oom/main.go:101
4 @ 0x43098f 0x42c09a 0x42b686 0x4c3a3b 0x4c484b 0x4c482c 0x57d94f 0x590d79 0x6b4c67 0x5397cf 0x53a51d 0x53a754 0x6419ef 0x6af18d 0x6af17f 0x6b5f33 0x6ba4fd 0x45d281
# 0x42b685 internal/poll.runtime_pollWait+0x55 /home/mh-cbon/.gvm/gos/go1.12.7/src/runtime/netpoll.go:182
# 0x4c3a3a internal/poll.(*pollDesc).wait+0x9a /home/mh-cbon/.gvm/gos/go1.12.7/src/internal/poll/fd_poll_runtime.go:87
// more...
Run Code Online (Sandbox Code Playgroud)
它告诉我们这些程序正在托管50 012goroutine,然后将它们按文件位置分组列出,其中第一个数字是50 000本示例第一组中正在运行的实例数。紧随其后的是导致 goroutine 存在的堆栈跟踪。
你可以看到有一堆系统的东西,在你的情况下,你不应该太担心它。
如果您的程序按您认为的那样运行,您必须寻找那些您认为不应该上线的项目。
但是,总体而言,您的代码并不令人满意,并且可能并且可能应该通过对其分配和整体设计概念的彻底审查来改进。
** 这是应用于原始源代码的更改的摘要。
k/main.go来充当后端服务器。_ "expvar"导入语句init阶段期间注册到的 std api HTTP 服务器实例go http.ListenAndServe("localhost:6060", nil)Timeout: time.Duration(300) * time.Millisecond,,否则负载测试不返回200sAddr: "localhost:8080",urls创建的值prepareRequest设置为 len=2 的静态列表req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte({"body":"param"}添加了错误检查)))io.Copy(ioutil.Discard, response.Body)use*_*276 -1
我已经通过用 替换net/httppackage解决了这个问题fasthttp。早些时候我没有使用它,因为我无法在 fasthttp 客户端上找到超时方法,但我发现DoTimeoutfasthttp 客户端确实有一种方法,可以在指定的持续时间后使请求超时。
这里是更新的代码:
在vars.go中 ClientFastHttp *fasthttp.Client
主程序
package main
import (
"./common"
"crypto/tls"
"fmt"
"github.com/gorilla/mux"
"github.com/valyala/fasthttp"
"log"
"math/rand"
"net"
"net/http"
"net/http/pprof"
"os"
"runtime"
"strconv"
"sync"
"time"
)
func init() {
//Get Any command line argument passed
args := os.Args[1:]
numCPU := runtime.NumCPU()
if len(args) > 1 {
numCPU, _ = strconv.Atoi(args[0])
}
common.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
DialContext: (&net.Dialer{
//Timeout: time.Duration() * time.Millisecond,
KeepAlive: 30 * time.Second,
}).DialContext,
//ForceAttemptHTTP2: true,
DisableKeepAlives: false,
//MaxIdleConns: 0,
//IdleConnTimeout: 0,
//TLSHandshakeTimeout: time.Duration(300) * time.Millisecond,
//ExpectContinueTimeout: 1 * time.Second,
}
common.Client = &http.Client{
Timeout: time.Duration(300) * time.Millisecond,
Transport: common.Transport,
}
runtime.GOMAXPROCS(numCPU)
rand.Seed(time.Now().UTC().UnixNano())
}
func main() {
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
_, _ = fmt.Fprintf(w, "Hello!!!")
})
router.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
prepareRequest(w, r, vars["name"])
}).Methods("POST")
// Register pprof handlers
router.HandleFunc("/debug/pprof/", pprof.Index)
router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
router.HandleFunc("/debug/pprof/profile", pprof.Profile)
router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
router.HandleFunc("/debug/pprof/trace", pprof.Trace)
routerMiddleWare := http.TimeoutHandler(router, 500*time.Millisecond, "Timeout")
srv := &http.Server{
Addr: "0.0.0.0:" + "80",
/*ReadTimeout: 500 * time.Millisecond,
WriteTimeout: 500 * time.Millisecond,
IdleTimeout: 10 * time.Second,*/
Handler: routerMiddleWare,
}
log.Fatal(srv.ListenAndServe())
}
func prepareRequest(w http.ResponseWriter, r *http.Request, name string) {
//other part of the code and call to goroutine
var urls []string
results, s, c := callUrls(urls)
finalCall(w, results, s, c)
}
type Response struct {
Status int
Url string
Body string
}
func callUrls(urls []string) ([]*Response, []string, []string) {
var wg sync.WaitGroup
wg.Add(len(urls))
ch := make(chan func() (*Response, string, string), len(urls))
for _, url := range urls {
go func(url string) {
//decide if request is valid for client to make http call using country/os
isValid := true //assuming url to be called
if isValid {
//make post call
//request body have many more paramter, just sample included.
//instead of creating new request, time.Sleep for 300ms doesn't cause any memory leak.
req := fasthttp.AcquireRequest()
req.SetRequestURI(url)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Connection", "Keep-Alive")
req.Header.SetMethod("POST")
req.SetBody([]byte(`{"body":"param"}`))
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(req) // <- do not forget to release
defer fasthttp.ReleaseResponse(resp) // <- do not forget to release
//err := clientFastHttp.Do(req, response)
//endregion
t := time.Duration(300)
err := common.ClientFastHttp.DoTimeout(req, resp, t*time.Millisecond)
body := resp.Body()
if err != nil {
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 500, Url: url, Body: ""}, "error", "500"
}
return
}
/*defer response.Body.Close()
body, _ := ioutil.ReadAll(response.Body)
_, err = io.Copy(ioutil.Discard, response.Body)
//Close the body, forced this
//Also tried without defer, and only wothout following line
response.Body.Close()*/
//do something with response body replace a few string etc.
//and return
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 200, Url: url, Body: string(body)}, "success", "200"
}
} else {
wg.Done()
ch <- func() (*Response, string, string) {
return &Response{Status: 500, Url: url, Body: ""}, "invalid", "500"
}
}
}(url)
}
wg.Wait()
var (
results []*Response
msg []string
status []string
)
for {
r, x, y := (<-ch)()
if r != nil {
results = append(results, r)
msg = append(msg, x)
status = append(status, y)
}
if len(results) == len(urls) {
return results, msg, status
}
}
}
func finalCall(w http.ResponseWriter, results []*Response, msg []string, status []string) {
fmt.Println("response", "response body", results, msg, status)
}
Run Code Online (Sandbox Code Playgroud)