Dan*_*ant 8 .net c# asynchronous tpl-dataflow
我正在研究一个系统,它涉及通过TCP网络连接接受命令,然后在执行这些命令时发送响应.相当基本的东西,但我希望支持一些要求:
我想使用async/await干净地实现它,并且基于我所读到的,TPL Dataflow听起来像是一种干净地将处理分解为可以在线程池上运行而不是绑定线程的漂亮块的好方法对于不同的会话/命令,阻塞等待句柄.
这就是我的开始(一些部分被剥离以简化,例如异常处理的细节;我还省略了一个为网络I/O提供有效等待的包装器):
private readonly Task _serviceTask;
private readonly Task _commandsTask;
private readonly CancellationTokenSource _cancellation;
private readonly BufferBlock<Command> _pendingCommands;
public NetworkService(ICommandProcessor commandProcessor)
{
_commandProcessor = commandProcessor;
IsRunning = true;
_cancellation = new CancellationTokenSource();
_pendingCommands = new BufferBlock<Command>();
_serviceTask = Task.Run((Func<Task>)RunService);
_commandsTask = Task.Run((Func<Task>)RunCommands);
}
public bool IsRunning { get; private set; }
private async Task RunService()
{
_listener = new TcpListener(IPAddress.Any, ServicePort);
_listener.Start();
while (IsRunning)
{
Socket client = null;
try
{
client = await _listener.AcceptSocketAsync();
client.Blocking = false;
var session = RunSession(client);
lock (_sessions)
{
_sessions.Add(session);
}
}
catch (Exception ex)
{ //Handling here...
}
}
}
private async Task RunCommands()
{
while (IsRunning)
{
var command = await _pendingCommands.ReceiveAsync(_cancellation.Token);
var task = Task.Run(() => RunCommand(command));
}
}
private async Task RunCommand(Command command)
{
try
{
var response = await _commandProcessor.RunCommand(command.Content);
Send(command.Client, response);
}
catch (Exception ex)
{
//Deal with general command exceptions here...
}
}
private async Task RunSession(Socket client)
{
while (client.Connected)
{
var reader = new DelimitedCommandReader(client);
try
{
var content = await reader.ReceiveCommand();
_pendingCommands.Post(new Command(client, content));
}
catch (Exception ex)
{
//Exception handling here...
}
}
}
Run Code Online (Sandbox Code Playgroud)
基础知识看起来很简单,但有一部分让我失望:如何确保在关闭应用程序时,我等待所有挂起的命令任务完成?当我使用Task.Run执行命令时,我得到Task对象,但是如何跟踪挂起的命令,以便在允许服务关闭之前确保所有命令都完整?
我已经考虑过使用一个简单的List,当List完成后从List中删除命令,但我想知道我是否缺少TPL Dataflow中的一些基本工具,这些工具可以让我更干净地完成这项工作.
编辑:
阅读有关TPL Dataflow的更多信息,我想知道我应该使用的是一个具有增加的MaxDegreeOfParallelism 的TransformBlock,以允许处理并行命令吗?这设置了可并行运行的命令数量的上限,但我认为这对我的系统来说是一个合理的限制.我很想听听那些有TPL Dataflow经验的人知道我是否走在正确的轨道上.
是的,所以...你有一半在这里使用TPL的力量.如果您正在订阅TPL DataFlow样式,那么您仍然BufferBlock在后台手动接收来自后台循环的项目Task并不是您想要的"方式".
你要做的是链接ActionBlock到BufferBlock并执行命令处理/发送.这也是您可以设置MaxDegreeOfParallelism控制要处理的并发命令数的块.这样安装可能看起来像这样:
// Initialization logic to build up the TPL flow
_pendingCommands = new BufferBlock<Command>();
_commandProcessor = new ActionBlock<Command>(this.ProcessCommand);
_pendingCommands.LinkTo(_commandProcessor);
private Task ProcessCommand(Command command)
{
var response = await _commandProcessor.RunCommand(command.Content);
this.Send(command.Client, response);
}
Run Code Online (Sandbox Code Playgroud)
然后,在您的关闭代码中,您需要发出信号,表示您已经完成了将项目添加到管道Complete中_pipelineCommands BufferBlock,然后通过调用,然后等待_commandProcessor ActionBlock完成以确保所有项目都已通过管道.你可以通过获取Task块的Completion属性返回并调用Wait它来执行此操作:
_pendingCommands.Complete();
_commandProcessor.Completion.Wait();
Run Code Online (Sandbox Code Playgroud)
如果你想获得奖励积分,你甚至可以将命令处理与命令发送分开.这将允许您将这些步骤彼此分开配置.例如,您可能需要限制处理命令的线程数,但希望有更多的发送响应.你可以通过简单地TransformBlock在流程的中间引入一个来做到这一点:
_pendingCommands = new BufferBlock<Command>();
_commandProcessor = new TransformBlock<Command, Tuple<Client, Response>>(this.ProcessCommand);
_commandSender = new ActionBlock<Tuple<Client, Response>(this.SendResponseToClient));
_pendingCommands.LinkTo(_commandProcessor);
_commandProcessor.LinkTo(_commandSender);
private Task ProcessCommand(Command command)
{
var response = await _commandProcessor.RunCommand(command.Content);
return Tuple.Create(command, response);
}
private Task SendResponseToClient(Tuple<Client, Response> clientAndResponse)
{
this.Send(clientAndResponse.Item1, clientAndResponse.Item2);
}
Run Code Online (Sandbox Code Playgroud)
您可能希望使用自己的数据结构而不是Tuple,它仅用于说明目的,但关键是这正是您想要用来分解管道的那种结构,以便您可以准确地控制它的各个方面你可能需要怎样.