在actor和异常中调用异步API

Pat*_*iek 9 c# akka.net

我知道PipeTo,但有些东西,比如同步等待嵌套延续,似乎违背异步和等待方式.

所以,我的第一个问题[1]将是:这里是否存在"魔法",以便我们可以在延续中同步等待嵌套任务并且最终仍然是异步的?

当我们处于异步并等待差异时,如何处理失败?

让我们创建一个简单的例子:

public static class AsyncOperations
{
    public async static Task<int> CalculateAnswerAsync()
    {
        await Task.Delay(1000).ConfigureAwait(false);
        throw new InvalidOperationException("Testing!");
        //return 42;
    }

    public async static Task<string> ConvertAsync(int number)
    {
        await Task.Delay(600).ConfigureAwait(false);
        return number + " :)";
    }
}
Run Code Online (Sandbox Code Playgroud)

在'常规',异步和等待方式:

var answer = await AsyncOperations.CalculateAnswerAsync();
var converted = await AsyncOperations.ConvertAsync(answer);
Run Code Online (Sandbox Code Playgroud)

正如您所期望的那样,例外将从第一次操作中冒出来.

现在,让我们创建一个将与这些异步操作一起工作的actor.对于争论的缘故,让我们说,CalculateAnswerAsyncConvertAsync应使用一个又一个为一体,富有操作(类似,例如,StreamWriter.WriteLineAsync并且StreamWriter.FlushAsync如果你只想写一行到一个流).

public sealed class AsyncTestActor : ReceiveActor
{
    public sealed class Start
    {
    }

    public sealed class OperationResult
    {
        private readonly string message;

        public OperationResult(string message)
        {
            this.message = message;
        }

        public string Message
        {
            get { return message; }
        }
    }

    public AsyncTestActor()
    {
        Receive<Start>(msg =>
               {
                   AsyncOperations.CalculateAnswerAsync()
                     .ContinueWith(result =>
                            {
                                var number = result.Result;
                                var conversionTask = AsyncOperations.ConvertAsync(number);
                                conversionTask.Wait(1500);
                                return new OperationResult(conversionTask.Result);
                            })
                     .PipeTo(Self);
                });
        Receive<OperationResult>(msg => Console.WriteLine("Got " + msg.Message));
    }
}
Run Code Online (Sandbox Code Playgroud)

如果没有例外,我仍然Got 42 :)没有任何问题,这让我回到[1]以上的"神奇"点.此外,示例中提供的AttachedToParentExecuteSynchronously标志是可选的,还是他们几乎需要让一切按预期工作?它们似乎对异常处理没有任何影响......

现在,如果CalculateAnswerAsync抛出异常,这意味着result.Result抛出AggregateException,它几乎被吞没而没有痕迹.

我应该在这里做什么,如果有可能的话,在异步操作中使异常崩溃,将actor作为"常规"异常会崩溃?

Aar*_*web 10

TPL中错误处理的乐趣:)

一旦Task开始在自己的线程上运行,其中发生的一切都已经与调用者异步 - 包括错误处理

  1. 当你Task在演员的第一个内部开始时,该任务独立于ThreadPool你的演员.这意味着你在里面做的任何东西Task都已经与你的actor异步 - 因为它在不同的线程上运行.这就是为什么我在Task.WaitPipeTo帖子顶部链接到示例中进行了调用.对演员没有任何影响 - 它看起来像是一个长期运行的任务.
  2. 例外 - 如果您的内部任务失败,该conversionTask.Result属性将抛出在其运行期间捕获的异常,因此您需要在您的内部添加一些错误处理,Task以确保您的actor被通知出现问题.请注意我在这里做了这个:https://github.com/petabridge/akkadotnet-code-samples/blob/master/PipeTo/src/PipeTo.App/Actors/HttpDownloaderActor.cs#L117 - 如果你把你的异常变成消息你的演员可以处理:鸟儿开始唱歌,彩虹闪耀,TPL错误不再是痛苦和痛苦的根源.
  3. 至于抛出异常时会发生什么......

现在,如果CalculateAnswerAsync抛出异常,这意味着result.Result抛出AggregateException,它几乎被吞没而没有跟踪.

AggregateException将包含内部异常包裹它内部的名单-第三方物流具有总误差的概念是在事件的原因是:(a)你有一个任务,那就是在总量,即多任务的延续Task.WhenAll或(b)你有错误传播到ContinueWith链回到父.您还可以调用该AggregateException.Flatten()调用,以便更轻松地管理嵌套异常.

TPL + Akka.NET的最佳实践

处理TPL中的异常是一件令人讨厌的事情,这是正确的 - 但处理它的最好方法是将try..catch..内部异常Task转换为你的演员可以处理的消息类.

另外,在示例可选中提供了AttachedToParent和ExecuteSynchronously标志,还是他们几乎需要让所有内容按预期工作?

当你有连续的延续时,这主要是一个问题 - PipeTo自动使用这些标志.它对错误处理没有任何影响,但可确保您的连续在与原始连接相同的线程上立即执行Task.

我建议只在你进行大量嵌套延续时才使用这些标志 - 一旦你进入深度超过1连续,TPL开始对如何安排你的任务采取一些自由(事实上,像OnlyOnCompleted这样的标志在更多时候停止被接受超过1续.)


Rog*_*son 7

只是为了增加亚伦所说的话.截至昨天,我们确实在使用Task调度程序时支持在actor内安全异步等待.

public class AsyncAwaitActor : ReceiveActor
{
    public AsyncAwaitActor()
    {
        Receive<string>(async m =>
        {
            await Task.Delay(TimeSpan.FromSeconds(1));
            Sender.Tell("done");
        });
    }
}

public class AskerActor : ReceiveActor
{
    public AskerActor(ActorRef other)
    {
        Receive<string>(async m =>
        {
            var res = await other.Ask(m);
            Sender.Tell(res);
        });
    }
}

public class ActorAsyncAwaitSpec : AkkaSpec
{
    [Fact]
    public async Task Actors_should_be_able_to_async_await_ask_message_loop()
    {
        var actor = Sys.ActorOf(Props.Create<AsyncAwaitActor>()
        .WithDispatcher("akka.actor.task-dispatcher"),
            "Worker");
        //IMPORTANT: you must use the akka.actor.task-dispatcher
        //otherwise async await is not safe

        var asker = Sys.ActorOf(Props.Create(() => new AskerActor(actor))
        .WithDispatcher("akka.actor.task-dispatcher"),
            "Asker");

        var res = await asker.Ask("something");
        Assert.Equal("done", res);
    }
}
Run Code Online (Sandbox Code Playgroud)

这不是我们的默认调度程序,因为它确实带来了性能/吞吐量的代价.如果触发阻塞任务(例如使用task.Wait()task.Result),也存在死锁的风险.因此,PipeTo模式仍然是首选方法,因为它更适合于actor模型.但是,如果您真的需要进行一些TPL集成,async await支持就可以作为额外的工具.

这个功能实际上是PipeTo在封面下使用.它将使每个任务继续并将其包装在特殊消息中并将该消息传递回actor并在actor自己的并发上下文中执行该任务.