kdg*_*ory 5

发送到 CloudWatch Logs 的每批事件都必须包含一个序列令牌(来自PutLogEvents API 文档):

{
   "logEvents": [ 
      { 
         "message": "string",
         "timestamp": number
      }
   ],
   "logGroupName": "string",
   "logStreamName": "string",
   "sequenceToken": "string"
}
Run Code Online (Sandbox Code Playgroud)

响应返回nextSequenceToken(再次来自 API 文档):

{
   "nextSequenceToken": "string",
   "rejectedLogEventsInfo": { 
      "expiredLogEventEndIndex": number,
      "tooNewLogEventStartIndex": number,
      "tooOldLogEventEndIndex": number
   }
}
Run Code Online (Sandbox Code Playgroud)

所以对您的问题的简短回答是:如果您有一个生产者写入流,您可以保存nextSequenceToken并使用它来填充sequenceToken您的下一个PutLogEvents请求。

更长的答案是,如果您有多个生产者,则不能使用此技术,因为生产者无权访问对另一个生产者请求的响应。相反,您必须在每个请求之前调用DescribeLogStreams。以下代码摘自我编写的 Java 日志框架(因此包括对此处未显示的函数的引用,并且可能包含语法错误,因为我省略了特定于日志库的内容):

/**
 *  This function retrieves the current information for a specific log stream.
 *  DescribeLogStreams is a paginated operation, which means that we have to
 *  be prepared for a large number of rows in the response, but since we're
 *  passing the full stream name as a prefix this should never happen.
 */
private LogStream findLogStream(String logGroupName, String logStreamName)
{
    DescribeLogStreamsRequest request = new DescribeLogStreamsRequest()
                                        .withLogGroupName(logGroupName)
                                        .withLogStreamNamePrefix(logStreamName);
    DescribeLogStreamsResult result;
    do
    {
        result = client.describeLogStreams(request);
        for (LogStream stream : result.getLogStreams())
        {
            if (stream.getLogStreamName().equals(logStreamName))
                return stream;
        }
        request.setNextToken(result.getNextToken());
    } while (result.getNextToken() != null);
    return null;
}

/**
 *  This function tries to send a batch of messages, retrieving the sequence
 *  number for each batch and handling the data race if another process has
 *  made that sequence number invalid.
 */
private List<LogMessage> attemptToSend(List<LogMessage> batch)
{
    if (batch.isEmpty())
        return batch;

    PutLogEventsRequest request = new PutLogEventsRequest()
                                  .withLogGroupName(config.logGroupName)
                                  .withLogStreamName(config.logStreamName)
                                  .withLogEvents(constructLogEvents(batch));

    for (int ii = 0 ; ii < 5 ; ii++)
    {
        LogStream stream = findLogStream();

        try
        {
            request.setSequenceToken(stream.getUploadSequenceToken());
            client.putLogEvents(request);
            return Collections.emptyList();
        }
        catch (InvalidSequenceTokenException ex)
        {
            stats.updateWriterRaceRetries();
            Utils.sleepQuietly(100);
            // continue retry loop
        }
        catch (DataAlreadyAcceptedException ex)
        {
            reportError("received DataAlreadyAcceptedException, dropping batch", ex);
            return Collections.emptyList();
        }
        catch (Exception ex)
        {
            reportError("failed to send batch", ex);
            return batch;
        }
    }

    reportError("received repeated InvalidSequenceTokenException responses -- increase batch delay?", null);
    stats.updateUnrecoveredWriterRaceRetries();
    return batch;
}
Run Code Online (Sandbox Code Playgroud)

您将从PutLogEvents请求中返回的大多数异常都是不可恢复的,因此此代码将忽略它们。InvalidSequenceTokenException但是,表示两个生产者之间存在竞争,并且另一个生产者能够在此生产者检索流描述和尝试写入其批次的时间之间写入一个批次。这不太可能,但可能,因此它进行了几次重试,然后拒绝该批次(它重新排队等待另一次尝试)。

响应的最后一部分可能对您很重要:CloudWatch 有关于批处理事件时间戳的规则(过去或未来不太远)。如果您的批次包含超出该范围的事件,它们将被删除,但其余事件将被添加到流中。您可以通过查看rejectedLogEventsInfo响应中的对象来查看是否发生这种情况,如果删除了任何记录,该对象将具有非零索引(对于日志记录框架,这不太可能发生,并且没有纠正它,所以我只是忽略那个响应值)。

  • 索要这样的令牌有何意义?为什么API需要我们发送呢? (2认同)