tom*_*tom 5 go amazon-sqs amazon-web-services consumer aws-sdk-go
AWS 开发工具包版本:v1.38.19
Go版本:go1.15.7
高山3.7
我正在使用标准队列,我在我的应用程序中立即初始化 SQS 连接,如下所示;
// Connection connection to the SQS
var Connection *sqs.SQS
// InitSQS initialize the AWS SQS connection
func InitSQS() {
sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
}))
Connection = sqs.New(sess, &aws.Config{
Region: aws.String("eu-west-1"),
DisableSSL: aws.Bool(true),
})
}
Run Code Online (Sandbox Code Playgroud)
我禁用 SSL 因为;当我在应用程序中使用 SSL 时,我遇到内存和 CPU 泄漏(我的应用程序不对世界其他地方开放,顺便说一句,它是我其他应用程序的内部服务)。
这是我用来从 SQS 读取消息的配置:
func ConsumeUpdateMessage(db *database.MySQLWrap, sqsApi queue.SQSAPI) error {
result, err := sqsApi.ReceiveMessage(&sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: &qURL,
MaxNumberOfMessages: aws.Int64(10),
WaitTimeSeconds: aws.Int64(20),
})
if err != nil {
return fmt.Errorf("error on receiving the message from queue: %s", err.Error())
}
for _, msg := range result.Messages {
// business logic
}
return err
}
Run Code Online (Sandbox Code Playgroud)
这就是我调用 ConsumeUpdateMessage 方法的方式;
// InitializeUpdateMessage ..
func InitializeUpdateMessage(db *database.MySQLWrap, sqsApi queue.SQSAPI) {
go func() {
for {
time.Sleep(500 * time.Millisecond)
err := ConsumeUpdateMessage(db, sqsApi)
if err != nil {
log.Error(err)
continue
}
}
}()
}
Run Code Online (Sandbox Code Playgroud)
但有时我的订阅者会返回这样的错误;
*awserr.baseError: RequestError: send request failed
caused by: Post "http://sqs.eu-west-1.amazonaws.com/": dial tcp xx.x.xx.xxx:80: i/o timeout
Run Code Online (Sandbox Code Playgroud)
(注:我输入的是 xx 而不是共享 IP 地址)
我在论坛和其他地方进行了搜索,但找不到这两个问题的解决方案,
这是一个很晚的答案,但我遇到了类似的问题,我想我发现了问题所在:您WaitTimeSeconds
为长轮询设置了 20 秒(这是一个好主意),但 HTTP 客户端超时时间早于此。
的文档WaitTimeSeconds
说:
// To avoid HTTP errors, ensure that the HTTP response timeout for ReceiveMessage
// requests is longer than the WaitTimeSeconds parameter. For example, with
// the Java SDK, you can set HTTP transport settings using the NettyNioAsyncHttpClient
// (https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.html)
// for asynchronous clients, or the ApacheHttpClient (https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.html)
// for synchronous clients.
Run Code Online (Sandbox Code Playgroud)
至少,v1.44.39
截至aws-sdk-go
.
以下页面介绍了如何配置自定义 HTTP 客户端:https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/custom-http.html
我将该页面的所有代码复制到此处,以供后代使用:
type HTTPClientSettings struct {
Connect time.Duration
ConnKeepAlive time.Duration
ExpectContinue time.Duration
IdleConn time.Duration
MaxAllIdleConns int
MaxHostIdleConns int
ResponseHeader time.Duration
TLSHandshake time.Duration
}
func NewHTTPClientWithSettings(httpSettings HTTPClientSettings) (*http.Client, error) {
var client http.Client
tr := &http.Transport{
ResponseHeaderTimeout: httpSettings.ResponseHeader,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
KeepAlive: httpSettings.ConnKeepAlive,
DualStack: true,
Timeout: httpSettings.Connect,
}).DialContext,
MaxIdleConns: httpSettings.MaxAllIdleConns,
IdleConnTimeout: httpSettings.IdleConn,
TLSHandshakeTimeout: httpSettings.TLSHandshake,
MaxIdleConnsPerHost: httpSettings.MaxHostIdleConns,
ExpectContinueTimeout: httpSettings.ExpectContinue,
}
// So client makes HTTP/2 requests
err := http2.ConfigureTransport(tr)
if err != nil {
return &client, err
}
return &http.Client{
Transport: tr,
}, nil
}
httpClient, err := NewHTTPClientWithSettings(HTTPClientSettings{
Connect: 5 * time.Second,
ExpectContinue: 1 * time.Second,
IdleConn: 90 * time.Second,
ConnKeepAlive: 30 * time.Second,
MaxAllIdleConns: 100,
MaxHostIdleConns: 10,
ResponseHeader: 5 * time.Second,
TLSHandshake: 5 * time.Second,
})
if err != nil {
fmt.Println("Got an error creating custom HTTP client:")
fmt.Println(err)
return
}
sess := session.Must(session.NewSession(&aws.Config{
HTTPClient: httpClient,
}))
sqsClient := sqs.New(sess)
Run Code Online (Sandbox Code Playgroud)
您将需要配置超时设置。特别是,我将Connect
、TLSHandshake
、 和设置ResponseHeader
为 25 秒,并且不再出现任何错误。
归档时间: |
|
查看次数: |
1032 次 |
最近记录: |