Pol*_*ity 4 .net c# dataflow task-parallel-library tpl-dataflow
我正在通过移植一些旧的套接字代码以使用TPL数据流和新的异步功能来尝试TPL数据流。尽管API感觉很坚如磐石,但我的代码仍然最终变得混乱。我想知道我是否在这里错过了什么。
我的要求如下:一个套接字类公开:Open,Close,Send和Receive方法。全部都返回一个Task,因此是异步的。打开和关闭是原子的。尽管发送和接收一次只能处理1条命令,但它们可以彼此相邻工作。
从逻辑上讲,这使我进入下一个内部控制代码:
// exposing an exclusive scheduler for connectivity related tasks and a parallel scheduler where send and receive can work with
private readonly ConcurrentExclusiveSchedulerPair exclusiveConnectionSchedulerPair;
private readonly ActionBlock<Action> connectionBlock;
private readonly ActionBlock<Action> sendBlock;
private readonly ActionBlock<Action> receiveBlock;
// within the constructor:
this.exclusiveConnectionSchedulerPair = new ConcurrentExclusiveSchedulerPair();
this.connectionBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ExclusiveScheduler });
this.sendBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });
this.receiveBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });
Run Code Online (Sandbox Code Playgroud)
到目前为止一切都很好。我可以安全地将动作发送到Send和Receive块,而不必担心与此同时正在运行的与连接有关的动作。此外,ActionBlock还确保发送的多个调用是同步的(接收,关闭和打开的对象)。
问题是动作没有简单的方法将任务传达回海报。现在,我正在使用TaskCompletionSource将结果传达回去。喜欢:
public Task Send(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
TaskCompletionSource<object> resultCompletionSource = new TaskCompletionSource<object>();
sendBlock.Post(async () =>
{
if (!tcpClient.Connected)
throw new InvalidOperationException("Cant send when not open");
else
{
await sendStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
resultCompletionSource.SetResult(null);
}
});
return resultCompletionSource.Task;
}
Run Code Online (Sandbox Code Playgroud)
感觉很难看又笨拙。我的问题是:有没有一种方法可以使用TPL同步工作流,而不必使用TaskCompletionSource在两者之间进行通信?
谢谢!
首先,您根本不需要TPL数据流,因为您实际上没有任何数据流。
其次,使用TaskSchedulers这样也不是正确的解决方案。TaskScheduler的调度代码,但是当您执行await某些操作时,没有代码在运行。因此,在WriteAsync()执行异步工作时,Open()可以运行的代码。
您实际需要的是类似的东西ReaderWriterLock,但与结合使用效果很好async。框架中没有类似的东西,但是您可以使用Stephen Toub的文章构建异步协调基元,第7部分:AsyncReaderWriterLock中的代码,它完全可以满足您的需求。本文还更详细地说明了为什么使用a TaskScheduler是错误的。
使用AsyncReaderWriterLock,您的代码可能如下所示:
public async Task Send(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
using (await readerWriterLock.ReaderLockAsync())
{
if (!tcpClient.Connected)
throw new InvalidOperationException("Can't send when not open");
await sendStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2190 次 |
| 最近记录: |