有效的信令任务TPL完成频繁重新启动事件

Dan*_*ant 9 .net c# task-parallel-library async-await

我正在研究一个模拟系统,除其他外,它允许在离散的模拟时间步骤中执行任务.执行都发生在模拟线程的上下文中,但是,从使用系统的"操作员"的角度来看,它们希望表现为异步.值得庆幸的是,TPL具有方便的'async/await'关键字,这使得它非常简单.我在模拟上有一个原始方法,如下所示:

    public Task CycleExecutedEvent()
    {
        lock (_cycleExecutedBroker)
        {
            if (!IsRunning) throw new TaskCanceledException("Simulation has been stopped");
            return _cycleExecutedBroker.RegisterForCompletion(CycleExecutedEventName);
        }
    }
Run Code Online (Sandbox Code Playgroud)

这基本上是创建一个新的TaskCompletionSource然后返回一个Task.此任务的目的是在模拟发生新的"ExecuteCycle"时执行其继续.

然后我有一些像这样的扩展方法:

    public static async Task WaitForDuration(this ISimulation simulation, double duration)
    {
        double startTime = simulation.CurrentSimulatedTime;
        do
        {
            await simulation.CycleExecutedEvent();
        } while ((simulation.CurrentSimulatedTime - startTime) < duration);
    }

    public static async Task WaitForCondition(this ISimulation simulation, Func<bool> condition)
    {
        do
        {
            await simulation.CycleExecutedEvent();
        } while (!condition());
    }
Run Code Online (Sandbox Code Playgroud)

这些非常方便,从"运营商"角度构建序列,根据条件采取行动并等待模拟时间段.我遇到的问题是CycleExecuted非常频繁地发生(如果我以完全加速的速度运行,大约每隔几毫秒).因为这些'wait'辅助方法在每个循环中注册一个新的'await',这会导致TaskCompletionSource实例中的大量转换.

我已经分析了我的代码,并且我发现大约5.5%的总CPU时间花费在这些完成中,其中"活动"代码中只花费了可忽略不计的百分比.实际上,所有时间都花在注册新的完成上,同时等待触发条件有效.

我的问题:我如何在保持async/await模式的便利性的同时提高写入"操作员行为"的性能?我认为我需要像轻量级和/或可重复使用的TaskCompletionSource这样的东西,因为触发事件经常发生.


我一直在做更多的研究,听起来一个很好的选择是创建Awaitable模式的自定义实现,它可以直接绑定到事件,消除了对一堆TaskCompletionSource和Task实例的需要.它在这里有用的原因是有很多不同的延续等待CycleExecutedEvent,他们需要经常等待它.理想情况下,我正在寻找一种方法来排队延续回调,然后在事件发生时回调队列中的所有内容.我会继续挖掘,但如果人们知道一个干净的方法,我欢迎任何帮助.


对于将来浏览这个问题的人来说,这是我放在一起的定制等待者:

public sealed class CycleExecutedAwaiter : INotifyCompletion
{
    private readonly List<Action> _continuations = new List<Action>();

    public bool IsCompleted
    {
        get { return false; }
    }

    public void GetResult()
    {
    }

    public void OnCompleted(Action continuation)
    {
        _continuations.Add(continuation);
    }

    public void RunContinuations()
    {
        var continuations = _continuations.ToArray();
        _continuations.Clear();
        foreach (var continuation in continuations)
            continuation();
    }

    public CycleExecutedAwaiter GetAwaiter()
    {
        return this;
    }
}
Run Code Online (Sandbox Code Playgroud)

在模拟器中:

    private readonly CycleExecutedAwaiter _cycleExecutedAwaiter = new CycleExecutedAwaiter();

    public CycleExecutedAwaiter CycleExecutedEvent()
    {
        if (!IsRunning) throw new TaskCanceledException("Simulation has been stopped");
        return _cycleExecutedAwaiter;
    }
Run Code Online (Sandbox Code Playgroud)

这有点好笑,因为等待者从未报告完整,但火灾继续称为完成注册; 不过,它适用于这个应用程序.这将CPU开销从5.5%降低到2.1%.它可能仍然需要一些调整,但它比原版有很好的改进.

svi*_*ick 8

await关键字不只是在工作的TaskS,它适用于任何遵循awaitable模式.有关详细信息,请参阅Stephen Toub的文章等待任何事情; .

简短版本是类型必须有一个方法GetAwaiter(),它返回一个实现的类型,INotifyCompletion并且还有IsCompleted属性和GetResult()方法(void如果await表达式不应该有值,则返回 - ).有关示例,请参阅TaskAwaiter.

如果你创建自己的等待,你可以每次返回相同的对象,避免分配许多TaskCompletionSources 的开销.


Mr.*_*oor 5

这是我的ReusableAwaiter模拟版本TaskCompletionSource

public sealed class ReusableAwaiter<T> : INotifyCompletion
{
    private Action _continuation = null;
    private T _result = default(T);
    private Exception _exception = null;

    public bool IsCompleted
    {
        get;
        private set;
    }

    public T GetResult()
    {
        if (_exception != null)
            throw _exception;
        return _result;
    }

    public void OnCompleted(Action continuation)
    {
        if (_continuation != null)
            throw new InvalidOperationException("This ReusableAwaiter instance has already been listened");
        _continuation = continuation;
    }

    /// <summary>
    /// Attempts to transition the completion state.
    /// </summary>
    /// <param name="result"></param>
    /// <returns></returns>
    public bool TrySetResult(T result)
    {
        if (!this.IsCompleted)
        {
            this.IsCompleted = true;
            this._result = result;

            if (_continuation != null)
                _continuation();
            return true;
        }
        return false;
    }

    /// <summary>
    /// Attempts to transition the exception state.
    /// </summary>
    /// <param name="result"></param>
    /// <returns></returns>
    public bool TrySetException(Exception exception)
    {
        if (!this.IsCompleted)
        {
            this.IsCompleted = true;
            this._exception = exception;

            if (_continuation != null)
                _continuation();
            return true;
        }
        return false;
    }

    /// <summary>
    /// Reset the awaiter to initial status
    /// </summary>
    /// <returns></returns>
    public ReusableAwaiter<T> Reset()
    {
        this._result = default(T);
        this._continuation = null;
        this._exception = null;
        this.IsCompleted = false;
        return this;
    }

    public ReusableAwaiter<T> GetAwaiter()
    {
        return this;
    }
}
Run Code Online (Sandbox Code Playgroud)

这是测试代码。

class Program
{
    static readonly ReusableAwaiter<int> _awaiter = new ReusableAwaiter<int>();

    static void Main(string[] args)
    {
        Task.Run(() => Test());

        Console.ReadLine();
        _awaiter.TrySetResult(22);
        Console.ReadLine();
        _awaiter.TrySetException(new Exception("ERR"));

        Console.ReadLine();
    }

    static async void Test()
    {

        int a = await AsyncMethod();
        Console.WriteLine(a);
        try
        {
            await AsyncMethod();
        }
        catch(Exception ex)
        {
            Console.WriteLine(ex.Message);
        }

    }

    static  ReusableAwaiter<int> AsyncMethod()
    {
        return _awaiter.Reset();
    }

}
Run Code Online (Sandbox Code Playgroud)