实现可重试块的正确完成

Ale*_*lex 53 c# task-parallel-library tpl-dataflow

预告:伙计们,这个问题不是关于如何实施重试政策.这是关于正确完成TPL数据流块.

这个问题主要是我之前在ITargetBlock中重试策略的问题的延续.这个问题的答案是@ svick使用TransformBlock(源)和TransformManyBlock(目标)的智能解决方案.剩下的唯一问题是以正确的方式完成此块:等待所有重试首先完成,然后完成目标块.这是我最终得到的结果(它只是一个片段,不要过多关注非线程安全retries集):

var retries = new HashSet<RetryingMessage<TInput>>();

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message =>
    {
        try
        {
            var result = new[] { await transform(message.Data) };
            retries.Remove(message);
            return result;
        }
        catch (Exception ex)
        {
            message.Exceptions.Add(ex);
            if (message.RetriesRemaining == 0)
            {
                if (failureHandler != null)
                    failureHandler(message.Exceptions);

                retries.Remove(message);
            }
            else
            {
                retries.Add(message);
                message.RetriesRemaining--;

                Task.Delay(retryDelay)
                    .ContinueWith(_ => target.Post(message));
            }
            return null;
        }
    }, dataflowBlockOptions);

source.LinkTo(target);

source.Completion.ContinueWith(async _ =>
{
    while (target.InputCount > 0 || retries.Any())
        await Task.Delay(100);

    target.Complete();
});
Run Code Online (Sandbox Code Playgroud)

我们的想法是执行某种轮询并验证是否仍有等待处理的消息,并且没有消息需要重试.但在这个解决方案中,我不喜欢轮询的想法.

是的,我可以将添加/删除重试的逻辑封装到一个单独的类中,甚至例如在重试集变空时执行某些操作,但是如何处理target.InputCount > 0条件?当没有块的待处理消息时,没有这样的回调被调用,所以看起来target.ItemCount在一个具有小延迟的循环中进行验证是唯一的选择.

有没有人知道更聪明的方法来实现这一目标?

Lor*_*tté 1

结合 hwcverwe 答案和 JamieSee 评论可能是理想的解决方案。

首先,您需要创建多个事件:

var signal  = new ManualResetEvent(false);
var completedEvent = new ManualResetEvent(false);
Run Code Online (Sandbox Code Playgroud)

然后,您必须创建一个观察者,并订阅TransformManyBlock,以便在相关事件发生时您会收到通知:

var observer = new RetryingBlockObserver<TOutput>(completedEvent);
var observable = target.AsObservable();
observable.Subscribe(observer);
Run Code Online (Sandbox Code Playgroud)

可观察到的内容非常简单:

private class RetryingBlockObserver<T> : IObserver<T> {
        private ManualResetEvent completedEvent;

        public RetryingBlockObserver(ManualResetEvent completedEvent) {                
            this.completedEvent = completedEvent;
        }

        public void OnCompleted() {
            completedEvent.Set();
        }

        public void OnError(Exception error) {
            //TODO
        }

        public void OnNext(T value) {
            //TODO
        }
    }
Run Code Online (Sandbox Code Playgroud)

您可以等待信号或完成(耗尽所有源项目),或两者都等待

 source.Completion.ContinueWith(async _ => {

            WaitHandle.WaitAll(completedEvent, signal);
            // Or WaitHandle.WaitAny, depending on your needs!

            target.Complete();
        });
Run Code Online (Sandbox Code Playgroud)

您可以检查 WaitAll 的结果值以了解设置了哪个事件,并做出相应的反应。您还可以将其他事件添加到代码中,将它们传递给观察者,以便观察者可以在需要时设置它们。例如,您可以区分自己的行为并在出现错误时做出不同的响应