Cosmo ChangeFeed - 错误、异常和服务失败场景

pra*_*jan 5 azure-cosmosdb

全部,

我正在使用更改源处理器库。想知道处理服务故障以及 ProcessChangesAsync 方法中的异常/错误场景的最佳方法。以下是我所指的事件。

1) 服务失败 - 具有处理器库的服务在某些操作过程中崩溃。如何从同一文档(失败实例上的文档)启动该流程?是否有任何内置机制可以使更改源从最后一个失败的文档开始?例如,假设,在当前批次中,我们成功处理了 10 个 docs.5,然后由于网络故障或其他原因导致服务中断。一旦服务重新启动,我的流程会从第 6 个文档开始吗?如何实现这一目标?

2) 异常和错误- ProcessChangesAsync 方法中的任何错误都可以在全局级别使用 try catch 进行处理,但如何保留这些失败记录并使它们可用于下一批?再次,在变更提要过程中寻找任何可用的内置机制。

Mat*_*nta 3

1) 默认情况下,处理器库在成功运行ProcessChangesAsync. 在最新的库版本中,您可以自定义检查点以在需要时执行手动检查点。如果由于某种原因处理器在检查点之前关闭,那么它将从 Leases 集合中存储的最后一个成功的检查点开始下一步处理。在您的情况下,它将再次从第一个文档开始,因此您永远不会丢失更改,但您可能会经历双重处理(这是“至少一次”模型)。

2) 没有可供您利用的内置机制,处理异常ProcessChangesAsync是您的责任。您不仅可以添加全局 try/catch,而且在循环遍历文档的情况下,可以在循环内添加 try/catch,以处理失败的文档(也许将其发送到队列以供以后分析/后处理)而不丢失批次。如果您需要记录这些错误(我假设这就是您所说的持久错误的意思?),那么最新版本与LibLog兼容,因此插入您自己的自定义日志记录非常简单:

using Microsoft.Azure.Documents.ChangeFeedProcessor.Logging;

var hostName = "SampleHost";
var tracelogProvider = new TraceLogProvider(); //You can use any provider supported by LibLog
using (tracelogProvider.OpenNestedContext(hostName))
{
    LogProvider.SetCurrentLogProvider(tracelogProvider);
    // After this, create IChangeFeedProcessor instance and start/stop it.
}
Run Code Online (Sandbox Code Playgroud)

来源

评论的额外信息

为了避免异常停止批次或导致批次被重新处理,您可以进行如下处理:

public async Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> documents, CancellationToken cancellationToken)
{
    try
    {
        foreach(var document in documents)
        {
            try
            {
                // Do your work for the document
            }
            catch(Exception ex)
            {
                // Something happened with the current document, handle it, send it to a queue / another storage to analyze, log it. This catch will make the loop continue with the next.
            }

        }
    }
    catch(Exception ex)
    {
        // Something unhandled happened, log it and avoid throwing it again so the next batch is processed    
    }
}
Run Code Online (Sandbox Code Playgroud)