当服务器没有可用的积压时,Golang TCP 连接速度变慢

bbg*_*bbg 5 c tcp go

更新

\n

添加 pthreaded C 客户端后,问题又出现了,这表明长连接时间是 TCP 协议的一部分,而不是具体实现的一部分。改变这些协议似乎并不容易。

\n

初始问题

\n

net我相信我的问题主要是:当尝试通过 TCP 连接到服务器时,Golang 包会做什么:

\n
    \n
  1. 服务器没有可用的连接,即使处于积压状态。
  2. \n
  3. 连接没有被拒绝/失败。
  4. \n
\n

在该连接中似乎存在大量开销,服务器响应时间从5 ms几秒增加到几秒。这在生产环境和下面的最小示例中都可以看到。正确的解决方案是使用服务器的连接池,这将被实现。这很大程度上是我的好奇心。

\n

重现:

\n
    \n
  1. 使用 运行服务器backlog = 1,运行 client.go。\n
      \n
    • 所有 50 个 goroutine 同时触发,总完成时间接近 2 分钟。
    • \n
    \n
  2. \n
  3. 使用 运行服务器backlog = 100,运行 client.go。\n
      \n
    • 所有 50 个 goroutine 立即触发,排队连接到服务器,并在~260 ms.
    • \n
    \n
  4. \n
  5. 利用重试时间运行三个 C 客户端平均50 us能够完成连接12 ms,因此没有看到此问题。
  6. \n
\n

示例输出backlog = 1(第一次是拨号时间,第二次是完成时间):

\n
user@computer ~/tcp-tests $ go run client.go 127.0.0.1:46999\nLong Elapsed Time: 216.579\xc2\xb5s, 315.196\xc2\xb5s\nLong Elapsed Time: 274.169\xc2\xb5s, 5.970873ms\nLong Elapsed Time: 74.4\xc2\xb5s, 10.753871ms\nLong Elapsed Time: 590.965\xc2\xb5s, 205.851066ms\nLong Elapsed Time: 1.029287689s, 1.029574065s\nLong Elapsed Time: 1.02945649s, 1.035098229s\n...\nLong Elapsed Time: 3.045881865s, 6.378597166s\nLong Elapsed Time: 3.045314838s, 6.383783688s\nTime taken stats: 2.85 +/- 1.59 s // average +/- STDEV\nMain Taken: 6.384677948s\n
Run Code Online (Sandbox Code Playgroud)\n

示例输出backlog = 100

\n
...\nLong Elapsed Time: 330.098\xc2\xb5s, 251.004077ms\nLong Elapsed Time: 298.146\xc2\xb5s, 256.187795ms\nLong Elapsed Time: 315.832\xc2\xb5s, 261.523685ms\nTime taken stats: 0.13 +/- 0.08 s\nMain Taken: 263.186955ms\n
Run Code Online (Sandbox Code Playgroud)\n

那么到底发生了什么net.DialTCP(我们也使用了其他风格的表盘,没有明显的差异)导致表盘时间增长?

\n
    \n
  • 尝试建立连接之间的轮询时间?
  • \n
  • RFC 5681 全局拥塞控制(可能包括互斥锁?)变量在所有初始失败的连接尝试中都会增加?
  • \n
  • 还有别的事吗?
  • \n
\n

我倾向于前两个,因为这些1s, 3s, 5s值似乎是神奇的数字。它们既出现在我不起眼的本地机器上,也出现在大规模生产环境中。

\n

这是用 C 编写的最小服务器。感兴趣的配置值是Listenbacklog的参数。

\n
/*\n    Adapted from\n    https://www.geeksforgeeks.org/tcp-server-client-implementation-in-c/\n    \n    Compile and run with:\n        gcc server.c -o server; ./server\n*/\n#include <stdio.h>\n#include <string.h>\n#include <sys/socket.h>\n#include <arpa/inet.h>\n#include <unistd.h>\n#include <sys/time.h>\n\nint main(void)\n{\n    int socket_desc, client_sock, client_size;\n    struct sockaddr_in server_addr, client_addr;\n    char server_message[2000], client_message[2000];\n\n    // Clean buffers:\n    memset(server_message, \'\\0\', sizeof(server_message));\n    memset(client_message, \'\\0\', sizeof(client_message));\n\n    // Create socket:\n    socket_desc = socket(AF_INET, SOCK_STREAM, 0);\n\n    if(socket_desc < 0){\n        printf("Error while creating socket\\n");\n        return -1;\n    }\n    printf("Socket created successfully\\n");\n\n    // Set port and IP:\n    server_addr.sin_family = AF_INET;\n    server_addr.sin_port = htons(46999);\n    server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");\n\n    // Bind to the set port and IP:\n    if(bind(socket_desc, (struct sockaddr*)&server_addr, sizeof(server_addr))<0){\n        printf("Couldn\'t bind to the port\\n");\n        return -1;\n    }\n    printf("Done with binding\\n");\n\n    // Listen for clients:\n    // Increasing the backlog allows the Go client to connect and wait\n    // rather than poll/retry.\n    if(listen(socket_desc, 100) < 0){\n        printf("Error while listening\\n");\n        return -1;\n    }\n    printf("\\nListening for incoming connections.....\\n");\n\n    // Accept an incoming connection:\n    client_size = sizeof(client_addr);\n    int server_run = 1;\n    do\n    {\n        struct timeval start, end;\n        double cpu_time_used;\n        gettimeofday(&start, NULL);\n        client_sock = accept(socket_desc, (struct sockaddr*)&client_addr, &client_size);\n\n        if (client_sock < 0){\n            printf("Can\'t accept\\n");\n            return -1;\n        }\n\n        // Receive client\'s message:\n        if (recv(client_sock, client_message, sizeof(client_message), 0) < 0){\n            printf("Couldn\'t receive\\n");\n            return -1;\n        }\n        if (strcmp(client_message, "stop") == 0)\n        {\n            server_run = 0;\n            printf("Received stop message.\\n");\n        }\n\n        // Respond to client:\n        strcpy(server_message, "This is the server\'s message.");\n\n        if (send(client_sock, server_message, strlen(server_message), 0) < 0){\n            printf("Can\'t send\\n");\n            return -1;\n        }\n\n        // sleep for 5 ms\n        usleep(5000);\n\n        // Closing the socket:\n        close(client_sock);\n        gettimeofday(&end, NULL);\n        cpu_time_used = (end.tv_usec - start.tv_usec) / 1000.0;\n        if (cpu_time_used > 0.0) // overflow in tv_usec if negative\n            printf("Server Time: %.4f ms\\n", cpu_time_used);\n    } while(server_run);\n\n    close(socket_desc);\n\n    return 0;\n}\n
Run Code Online (Sandbox Code Playgroud)\n

这是测试 Go 客户端

\n
/*\n    Adapted from\n    https://www.linode.com/docs/guides/developing-udp-and-tcp-clients-and-servers-in-go/\n\n    Run once the server.c is compiled and running with:\n        go run client.go 127.0.0.1:46999\n*/\npackage main\n\nimport (\n    "fmt"\n    "net"\n    "os"\n    "time"\n    "github.com/montanaflynn/stats"\n    "sync"\n)\nfunc do_message(wg *sync.WaitGroup, connect string, time_taken *float64) {\n    defer wg.Done()\n    message := make([]byte, 128)\n    start_time := time.Now()\n    pAddr, err := net.ResolveTCPAddr("tcp", connect)\n    if err != nil {\n        return\n    }\n\n    c, err := net.DialTCP("tcp", nil, pAddr)\n    if err != nil {\n        fmt.Println(err)\n        return\n    }\n    c.SetLinger(0)\n    dialed_time := time.Since(start_time)\n\n    defer func() {\n        c.Close()\n        elapsed_time := time.Since(start_time)\n        if elapsed_time.Microseconds() > 60 { // microseconds\n            fmt.Println("Long Elapsed Time: " + dialed_time.String() + ", " + elapsed_time.String())\n        }\n        *time_taken = float64(elapsed_time.Microseconds())\n    }()\n\n    text := "{\\"service\\": \\"magic_service_str\\"}"\n    c.Write([]byte(text))\n    code, _ := c.Read(message) // Does not actually wait for response.\n    code = code\n}\nfunc main() {\n    main_start := time.Now()\n    arguments := os.Args\n    if len(arguments) == 1 {\n            fmt.Println("Please provide host:port.")\n            return\n    }\n    n_messages := 50\n    wg := new(sync.WaitGroup)\n    wg.Add(n_messages)\n    times := make([]float64, n_messages)\n    for i := 0; i < n_messages; i++ {\n        // Used to turn the goroutines into serial implementation\n        // time.Sleep(5500 * time.Microsecond)\n        go do_message(wg, arguments[1], &times[i])\n    }\n    wg.Wait()\n    avg, _ := stats.Mean(times)\n    std, _ := stats.StandardDeviation(times)\n    fmt.Println("Time taken stats: " + fmt.Sprintf("%.2f", avg / 1000000.0) + " +/- " + fmt.Sprintf("%.2f", std / 1000000.0) + " s")\n    main_taken := time.Since(main_start)\n    fmt.Println("Main Taken: " + main_taken.String())\n}\n
Run Code Online (Sandbox Code Playgroud)\n

更新了 C 语言的 pthreaded 客户端并确认问题不是 Golang 实现:

\n
// gcc client_p.c -o pclient -lpthread\n#include <stdio.h>\n#include <string.h>\n#include <sys/socket.h>\n#include <arpa/inet.h>\n#include <unistd.h>\n#include <stdlib.h>\n#include<sys/time.h>\n\n\n#include <pthread.h>\n#include <errno.h>\n\n#ifndef THREAD_LOOP_COUNT\n#define THREAD_LOOP_COUNT 1\n#endif\n\n/* Subtract the \xe2\x80\x98struct timeval\xe2\x80\x99 values X and Y,\n   storing the result in RESULT.\n   Return 1 if the difference is negative, otherwise 0.\n   https://www.gnu.org/software/libc/manual/html_node/Calculating-Elapsed-Time.html\n*/\n\nint\ntimeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y)\n{\n  /* Perform the carry for the later subtraction by updating y. */\n  if (x->tv_usec < y->tv_usec) {\n    int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;\n    y->tv_usec -= 1000000 * nsec;\n    y->tv_sec += nsec;\n  }\n  if (x->tv_usec - y->tv_usec > 1000000) {\n    int nsec = (x->tv_usec - y->tv_usec) / 1000000;\n    y->tv_usec += 1000000 * nsec;\n    y->tv_sec -= nsec;\n  }\n\n  /* Compute the time remaining to wait.\n     tv_usec is certainly positive. */\n  result->tv_sec = x->tv_sec - y->tv_sec;\n  result->tv_usec = x->tv_usec - y->tv_usec;\n\n  /* Return 1 if result is negative. */\n  return x->tv_sec < y->tv_sec;\n}\n\n\nstatic void* workerThreadFunc(void* arg)\n{\n    int socket_desc;\n    struct sockaddr_in server_addr;\n    char server_message[2000], client_message[2000];\n    // Clean buffers:\n    memset(server_message,\'\\0\',sizeof(server_message));\n    memset(client_message,\'\\0\',sizeof(client_message));\n    // Set port and IP the same as server-side:\n    server_addr.sin_family = AF_INET;\n    server_addr.sin_port = htons(46999);\n    server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");\n    int retries = 0;\n    struct timeval start, end, difference;\n    double cpu_time_used;\n    for(int i = 0; i < THREAD_LOOP_COUNT; i++)\n    {\n        gettimeofday(&start, NULL);\n        // Create socket:\n        socket_desc = socket(AF_INET, SOCK_STREAM, 0);\n        if(socket_desc < 0){\n            printf("Unable to create socket\\n");\n            return;\n        }\n        // Send connection request to server:\n        while(connect(socket_desc, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0){\n            retries++;\n            if (retries > 10)\n            {\n                printf("Unable to connect\\n");\n                retries = 0;\n            }\n            usleep(5);\n        }\n        int retries = 0;\n\n        // Send the message to server:\n        if(send(socket_desc, client_message, strlen("client message."), 0) < 0){\n            printf("Unable to send message\\n");\n            close(socket_desc);\n            return;\n        }\n\n        // Receive the server\'s response:\n        if(recv(socket_desc, server_message, sizeof(server_message), 0) < 0){\n            printf("Error while receiving server\'s msg\\n");\n            close(socket_desc);\n            return;\n        }\n\n        // Close the socket:\n        close(socket_desc);\n        gettimeofday(&end, NULL);\n        timeval_subtract (&difference, &end, &start);\n        double cpu_time_used = (double)difference.tv_sec + (double)difference.tv_usec / 1000000.0;\n        printf("Client Time: %.4e s\\n", cpu_time_used);\n    }\n}\n\nint main(int argc, char **argv)\n{\n    int n_threads = 50;  // default value\n    if (argc > 1)\n        n_threads = atoi(argv[1]);\n\n    pthread_t *threads = (pthread_t*)malloc(n_threads * sizeof(pthread_t));\n\n    struct timeval start, end, difference;\n    gettimeofday(&start, NULL);\n    for(int i = 0; i < n_threads; i++)\n    {\n        int createRet = pthread_create(&threads[i], NULL, workerThreadFunc, NULL);\n        if (createRet != 0)\n        {\n            printf("failed to create thread\\n");\n        }\n    }\n    for(int i = 0; i < n_threads; i++)\n        pthread_join(threads[i], NULL);\n    gettimeofday(&end, NULL);\n    timeval_subtract (&difference, &end, &start);\n    double cpu_time_used = (double)difference.tv_sec + (double)difference.tv_usec / 1000000.0;\n    printf("Total Client Time: %.4e s\\n", cpu_time_used);\n    free(threads);\n    return 0;\n}\n
Run Code Online (Sandbox Code Playgroud)\n

bbg*_*bbg 0

正如 @user207421 所指出的,问题在于 TCP 实现,其中包括重试时的指数退避。Golang 和 C 似乎都没有简单的方法来改变这种行为。

答案是:如果吞吐量较高,请不要打开和关闭 TCP 连接——使用连接池。

有一些工作正在考虑消除指数退避,如下链接,但对于特定情况可能有更好的解决方案。有给我的。

ACM SIGCOMM 计算机通信评论,“从 TCP 中删除指数退避”,第 38 卷,第 5 期,2008 年 10 月。