我知道PipeTo,但有些东西,比如同步等待嵌套延续,似乎违背异步和等待方式.
所以,我的第一个问题[1]将是:这里是否存在"魔法",以便我们可以在延续中同步等待嵌套任务并且最终仍然是异步的?
当我们处于异步并等待差异时,如何处理失败?
让我们创建一个简单的例子:
public static class AsyncOperations
{
public async static Task<int> CalculateAnswerAsync()
{
await Task.Delay(1000).ConfigureAwait(false);
throw new InvalidOperationException("Testing!");
//return 42;
}
public async static Task<string> ConvertAsync(int number)
{
await Task.Delay(600).ConfigureAwait(false);
return number + " :)";
}
}
Run Code Online (Sandbox Code Playgroud)
在'常规',异步和等待方式:
var answer = await AsyncOperations.CalculateAnswerAsync();
var converted = await AsyncOperations.ConvertAsync(answer);
Run Code Online (Sandbox Code Playgroud)
正如您所期望的那样,例外将从第一次操作中冒出来.
现在,让我们创建一个将与这些异步操作一起工作的actor.对于争论的缘故,让我们说,CalculateAnswerAsync并ConvertAsync应使用一个又一个为一体,富有操作(类似,例如,StreamWriter.WriteLineAsync并且StreamWriter.FlushAsync如果你只想写一行到一个流).
public sealed class AsyncTestActor : ReceiveActor
{
public sealed class Start
{
}
public sealed class OperationResult
{
private readonly string message;
public OperationResult(string message)
{
this.message = message;
}
public string Message
{
get { return message; }
}
}
public AsyncTestActor()
{
Receive<Start>(msg =>
{
AsyncOperations.CalculateAnswerAsync()
.ContinueWith(result =>
{
var number = result.Result;
var conversionTask = AsyncOperations.ConvertAsync(number);
conversionTask.Wait(1500);
return new OperationResult(conversionTask.Result);
})
.PipeTo(Self);
});
Receive<OperationResult>(msg => Console.WriteLine("Got " + msg.Message));
}
}
Run Code Online (Sandbox Code Playgroud)
如果没有例外,我仍然Got 42 :)没有任何问题,这让我回到[1]以上的"神奇"点.此外,示例中提供的AttachedToParent和ExecuteSynchronously标志是可选的,还是他们几乎需要让一切按预期工作?它们似乎对异常处理没有任何影响......
现在,如果CalculateAnswerAsync抛出异常,这意味着result.Result抛出AggregateException,它几乎被吞没而没有痕迹.
我应该在这里做什么,如果有可能的话,在异步操作中使异常崩溃,将actor作为"常规"异常会崩溃?
Aar*_*web 10
TPL中错误处理的乐趣:)
Task在演员的第一个内部开始时,该任务独立于ThreadPool你的演员.这意味着你在里面做的任何东西Task都已经与你的actor异步 - 因为它在不同的线程上运行.这就是为什么我在Task.WaitPipeTo帖子顶部链接到的示例中进行了调用.对演员没有任何影响 - 它看起来像是一个长期运行的任务.conversionTask.Result属性将抛出在其运行期间捕获的异常,因此您需要在您的内部添加一些错误处理,Task以确保您的actor被通知出现问题.请注意我在这里做了这个:https://github.com/petabridge/akkadotnet-code-samples/blob/master/PipeTo/src/PipeTo.App/Actors/HttpDownloaderActor.cs#L117 - 如果你把你的异常变成消息你的演员可以处理:鸟儿开始唱歌,彩虹闪耀,TPL错误不再是痛苦和痛苦的根源.现在,如果CalculateAnswerAsync抛出异常,这意味着result.Result抛出AggregateException,它几乎被吞没而没有跟踪.
在AggregateException将包含内部异常包裹它内部的名单-第三方物流具有总误差的概念是在事件的原因是:(a)你有一个任务,那就是在总量,即多任务的延续Task.WhenAll或(b)你有错误传播到ContinueWith链回到父.您还可以调用该AggregateException.Flatten()调用,以便更轻松地管理嵌套异常.
处理TPL中的异常是一件令人讨厌的事情,这是正确的 - 但处理它的最好方法是将try..catch..内部异常Task转换为你的演员可以处理的消息类.
另外,在示例可选中提供了AttachedToParent和ExecuteSynchronously标志,还是他们几乎需要让所有内容按预期工作?
当你有连续的延续时,这主要是一个问题 - PipeTo自动使用这些标志.它对错误处理没有任何影响,但可确保您的连续在与原始连接相同的线程上立即执行Task.
我建议只在你进行大量嵌套延续时才使用这些标志 - 一旦你进入深度超过1连续,TPL开始对如何安排你的任务采取一些自由(事实上,像OnlyOnCompleted这样的标志在更多时候停止被接受超过1续.)
只是为了增加亚伦所说的话.截至昨天,我们确实在使用Task调度程序时支持在actor内安全异步等待.
public class AsyncAwaitActor : ReceiveActor
{
public AsyncAwaitActor()
{
Receive<string>(async m =>
{
await Task.Delay(TimeSpan.FromSeconds(1));
Sender.Tell("done");
});
}
}
public class AskerActor : ReceiveActor
{
public AskerActor(ActorRef other)
{
Receive<string>(async m =>
{
var res = await other.Ask(m);
Sender.Tell(res);
});
}
}
public class ActorAsyncAwaitSpec : AkkaSpec
{
[Fact]
public async Task Actors_should_be_able_to_async_await_ask_message_loop()
{
var actor = Sys.ActorOf(Props.Create<AsyncAwaitActor>()
.WithDispatcher("akka.actor.task-dispatcher"),
"Worker");
//IMPORTANT: you must use the akka.actor.task-dispatcher
//otherwise async await is not safe
var asker = Sys.ActorOf(Props.Create(() => new AskerActor(actor))
.WithDispatcher("akka.actor.task-dispatcher"),
"Asker");
var res = await asker.Ask("something");
Assert.Equal("done", res);
}
}
Run Code Online (Sandbox Code Playgroud)
这不是我们的默认调度程序,因为它确实带来了性能/吞吐量的代价.如果触发阻塞任务(例如使用task.Wait()或task.Result),也存在死锁的风险.因此,PipeTo模式仍然是首选方法,因为它更适合于actor模型.但是,如果您真的需要进行一些TPL集成,async await支持就可以作为额外的工具.
这个功能实际上是PipeTo在封面下使用.它将使每个任务继续并将其包装在特殊消息中并将该消息传递回actor并在actor自己的并发上下文中执行该任务.