SQS 上接收消息时出现 i/o 超时问题以及 https 使用内存泄漏

tom*_*tom 5 go amazon-sqs amazon-web-services consumer aws-sdk-go

AWS 开发工具包版本:v1.38.19

Go版本:go1.15.7

高山3.7

我正在使用标准队列,我在我的应用程序中立即初始化 SQS 连接,如下所示;

// Connection connection to the SQS
var Connection *sqs.SQS

// InitSQS initialize the AWS SQS connection
func InitSQS() {
    sess := session.Must(session.NewSessionWithOptions(session.Options{
        SharedConfigState: session.SharedConfigEnable,
    }))

    Connection = sqs.New(sess, &aws.Config{
        Region:     aws.String("eu-west-1"),
        DisableSSL: aws.Bool(true),
    })
}
Run Code Online (Sandbox Code Playgroud)

我禁用 SSL 因为;当我在应用程序中使用 SSL 时,我遇到内存和 CPU 泄漏(我的应用程序不对世界其他地方开放,顺便说一句,它是我其他应用程序的内部服务)。

这是我用来从 SQS 读取消息的配置:

func ConsumeUpdateMessage(db *database.MySQLWrap, sqsApi queue.SQSAPI) error {
    result, err := sqsApi.ReceiveMessage(&sqs.ReceiveMessageInput{
        AttributeNames: []*string{
            aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
        },
        MessageAttributeNames: []*string{
            aws.String(sqs.QueueAttributeNameAll),
        },
        QueueUrl:            &qURL,
        MaxNumberOfMessages: aws.Int64(10),
        WaitTimeSeconds:     aws.Int64(20),
    })

    if err != nil {
        return fmt.Errorf("error on receiving the message from queue: %s", err.Error())
    }

    for _, msg := range result.Messages {
            // business logic
        }

    return err
}
Run Code Online (Sandbox Code Playgroud)

这就是我调用 ConsumeUpdateMessage 方法的方式;

// InitializeUpdateMessage ..
func InitializeUpdateMessage(db *database.MySQLWrap, sqsApi queue.SQSAPI) {
    go func() {
        for {
            time.Sleep(500 * time.Millisecond)

            err := ConsumeUpdateMessage(db, sqsApi)

            if err != nil {
                log.Error(err)

                continue
            }
        }
    }()
}
Run Code Online (Sandbox Code Playgroud)

但有时我的订阅者会返回这样的错误;

*awserr.baseError: RequestError: send request failed
caused by: Post "http://sqs.eu-west-1.amazonaws.com/": dial tcp xx.x.xx.xxx:80: i/o timeout
Run Code Online (Sandbox Code Playgroud)

(注:我输入的是 xx 而不是共享 IP 地址)

我在论坛和其他地方进行了搜索,但找不到这两个问题的解决方案,

  1. 当我使用 SSL 连接时出现内存泄漏(想法:但由于我的应用程序是内部服务,我认为我不必使用 SSL)
  2. 输入/输出超时

cbr*_*ier 0

这是一个很晚的答案,但我遇到了类似的问题,我想我发现了问题所在:您WaitTimeSeconds为长轮询设置了 20 秒(这是一个好主意),但 HTTP 客户端超时时间早于此。

的文档WaitTimeSeconds说:

// To avoid HTTP errors, ensure that the HTTP response timeout for ReceiveMessage
// requests is longer than the WaitTimeSeconds parameter. For example, with
// the Java SDK, you can set HTTP transport settings using the NettyNioAsyncHttpClient
// (https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.html)
// for asynchronous clients, or the ApacheHttpClient (https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.html)
// for synchronous clients.
Run Code Online (Sandbox Code Playgroud)

至少,v1.44.39截至aws-sdk-go.

以下页面介绍了如何配置自定义 HTTP 客户端:https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/custom-http.html

我将该页面的所有代码复制到此处,以供后代使用:

type HTTPClientSettings struct {
    Connect          time.Duration
    ConnKeepAlive    time.Duration
    ExpectContinue   time.Duration
    IdleConn         time.Duration
    MaxAllIdleConns  int
    MaxHostIdleConns int
    ResponseHeader   time.Duration
    TLSHandshake     time.Duration
}

func NewHTTPClientWithSettings(httpSettings HTTPClientSettings) (*http.Client, error) {
    var client http.Client
    tr := &http.Transport{
        ResponseHeaderTimeout: httpSettings.ResponseHeader,
        Proxy:                 http.ProxyFromEnvironment,
        DialContext: (&net.Dialer{
            KeepAlive: httpSettings.ConnKeepAlive,
            DualStack: true,
            Timeout:   httpSettings.Connect,
        }).DialContext,
        MaxIdleConns:          httpSettings.MaxAllIdleConns,
        IdleConnTimeout:       httpSettings.IdleConn,
        TLSHandshakeTimeout:   httpSettings.TLSHandshake,
        MaxIdleConnsPerHost:   httpSettings.MaxHostIdleConns,
        ExpectContinueTimeout: httpSettings.ExpectContinue,
    }

    // So client makes HTTP/2 requests
    err := http2.ConfigureTransport(tr)
    if err != nil {
        return &client, err
    }

    return &http.Client{
        Transport: tr,
    }, nil
}

httpClient, err := NewHTTPClientWithSettings(HTTPClientSettings{
    Connect:          5 * time.Second,
    ExpectContinue:   1 * time.Second,
    IdleConn:         90 * time.Second,
    ConnKeepAlive:    30 * time.Second,
    MaxAllIdleConns:  100,
    MaxHostIdleConns: 10,
    ResponseHeader:   5 * time.Second,
    TLSHandshake:     5 * time.Second,
})
if err != nil {
    fmt.Println("Got an error creating custom HTTP client:")
    fmt.Println(err)
    return
}

sess := session.Must(session.NewSession(&aws.Config{
    HTTPClient: httpClient,
}))

sqsClient := sqs.New(sess)
Run Code Online (Sandbox Code Playgroud)

您将需要配置超时设置。特别是,我将ConnectTLSHandshake、 和设置ResponseHeader为 25 秒,并且不再出现任何错误。