全部,
我正在使用更改源处理器库。想知道处理服务故障以及 ProcessChangesAsync 方法中的异常/错误场景的最佳方法。以下是我所指的事件。
1) 服务失败 - 具有处理器库的服务在某些操作过程中崩溃。如何从同一文档(失败实例上的文档)启动该流程?是否有任何内置机制可以使更改源从最后一个失败的文档开始?例如,假设,在当前批次中,我们成功处理了 10 个 docs.5,然后由于网络故障或其他原因导致服务中断。一旦服务重新启动,我的流程会从第 6 个文档开始吗?如何实现这一目标?
2) 异常和错误- ProcessChangesAsync 方法中的任何错误都可以在全局级别使用 try catch 进行处理,但如何保留这些失败记录并使它们可用于下一批?再次,在变更提要过程中寻找任何可用的内置机制。
1) 默认情况下,处理器库在成功运行ProcessChangesAsync. 在最新的库版本中,您可以自定义检查点以在需要时执行手动检查点。如果由于某种原因处理器在检查点之前关闭,那么它将从 Leases 集合中存储的最后一个成功的检查点开始下一步处理。在您的情况下,它将再次从第一个文档开始,因此您永远不会丢失更改,但您可能会经历双重处理(这是“至少一次”模型)。
2) 没有可供您利用的内置机制,处理异常ProcessChangesAsync是您的责任。您不仅可以添加全局 try/catch,而且在循环遍历文档的情况下,可以在循环内添加 try/catch,以处理失败的文档(也许将其发送到队列以供以后分析/后处理)而不丢失批次。如果您需要记录这些错误(我假设这就是您所说的持久错误的意思?),那么最新版本与LibLog兼容,因此插入您自己的自定义日志记录非常简单:
using Microsoft.Azure.Documents.ChangeFeedProcessor.Logging;
var hostName = "SampleHost";
var tracelogProvider = new TraceLogProvider(); //You can use any provider supported by LibLog
using (tracelogProvider.OpenNestedContext(hostName))
{
LogProvider.SetCurrentLogProvider(tracelogProvider);
// After this, create IChangeFeedProcessor instance and start/stop it.
}
Run Code Online (Sandbox Code Playgroud)
评论的额外信息
为了避免异常停止批次或导致批次被重新处理,您可以进行如下处理:
public async Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> documents, CancellationToken cancellationToken)
{
try
{
foreach(var document in documents)
{
try
{
// Do your work for the document
}
catch(Exception ex)
{
// Something happened with the current document, handle it, send it to a queue / another storage to analyze, log it. This catch will make the loop continue with the next.
}
}
}
catch(Exception ex)
{
// Something unhandled happened, log it and avoid throwing it again so the next batch is processed
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
944 次 |
| 最近记录: |