Har*_*lar -1 webserver go confluent-kafka
我正在基准测试使用Go语言编写的简单Web服务器wrk。服务器正在具有4GB RAM的计算机上运行。在测试开始时,该代码每秒可处理2000个请求,因此性能非常好。但是随着时间的流逝,该进程使用的内存会逐渐增加,一旦达到85%(我正在使用进行检查top),吞吐量就会下降到约100个请求/秒。重新启动服务器后,吞吐量再次增加到最佳数量。
是由于内存问题导致性能下降吗?Go为什么不释放此内存?我的Go服务器如下所示:
func main() {
defer func() {
// Wait for all messages to drain out before closing the producer
p.Flush(1000)
p.Close()
}()
http.HandleFunc("/endpoint", handler)
log.Fatal(http.ListenAndServe(":8080", nil))
}
Run Code Online (Sandbox Code Playgroud)
在处理程序中,我将传入的Protobuf消息转换为Json并使用融合的Kafka Go库将其写入Kafka。
var p, err = kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "abc-0.com:6667,abc-1.com:6667",
"message.timeout.ms": "30000",
"sasl.kerberos.keytab": "/opt/certs/TEST.KEYTAB",
"sasl.kerberos.principal": "TEST@TEST.ABC.COM",
"sasl.kerberos.service.name": "kafka",
"security.protocol": "SASL_PLAINTEXT",
})
var topic = "test"
func handler(w http.ResponseWriter, r *http.Request) {
body, _ := ioutil.ReadAll(r.Body)
// Deserialize byte[] to Protobuf message
protoMessage := &tutorial.REALTIMEGPS{}
_ := proto.Unmarshal(body, protoMessage)
// Convert Protobuf to Json
realTimeJson, _ := convertProtoToJson(protoMessage)
_, err := fmt.Fprintf(w, "")
if err != nil {
log.Fatal(responseErr)
}
// Send to Kafka
produceMessage([]byte(realTimeJson))
}
func produceMessage(message []byte) {
// Delivery report
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
log.Println("Delivery failed: ", ev.TopicPartition)
} else {
log.Println("Delivered message to ", ev.TopicPartition)
}
}
}
}()
// Send message
_ := p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: message,
}, nil)
}
func convertProtoToJson(pb proto.Message) (string, error) {
marshaler := jsonpb.Marshaler{}
json, err := marshaler.MarshalToString(pb)
return json, err
}
Run Code Online (Sandbox Code Playgroud)
问题在于,在每个请求的末尾,您都会调用produceMessage(),这会向kafka发送一条消息,并启动goroutine来接收用于检查错误的事件。
当传入的请求传入时,您的代码会不停地启动goroutines,并且直到您的kafka客户端出现问题后它们才结束。这将需要越来越多的内存,并且随着越来越多的goroutine被调度,可能会越来越多的CPU。
不要这样 单个goroutine就足以用于交付报告。p设置了变量后,启动一个goroutine ,就可以了。
例如:
var p *kafka.Producer
func init() {
var err error
p, err = kafka.NewProducer(&kafka.ConfigMap{
// ...
}
if err != nil {
// Handle error
}
// Delivery report
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
log.Println("Delivery failed: ", ev.TopicPartition)
} else {
log.Println("Delivered message to ", ev.TopicPartition)
}
}
}
}()
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
79 次 |
| 最近记录: |