Dai*_*Dai 5 c# asynchronous async-await
我的代码处理到远程主机的TCP连接,ConcurrentQueue用于存储传出消息.它打算在单个线程中运行.连接的生命周期包含在内,RunAsync而单独的对象包含连接的"公共状态":
class PublicState
{
internal readonly ConcurrentQueue<Message> OutgoingMessageQueue = new ConcurrentQueue<Message>();
internal TaskCompletionSource<Object> OutgoingMessageTcs = null;
internal readonly TaskCompletionSource<Object> ConnectedTcs = new TaskCompletionSource<Object>();
public void EnqueueMessages(IEnumerable<Message> messages)
{
foreach( Message m in messages ) this.OutgoingMessageQueue.Enqueue( m);
if( this.OutgoingMessageTcs == null ) this.OutgoingMessageTcs = new TaskCompletionSource<Object>();
this.OutgoingMessageTcs.SetResult( null );
}
}
static async Task RunAsync(IPEndPoint endPoint, PublicState state)
{
using( TcpClient tcp = new TcpClient() )
{
await tcp.ConnectAsync( endPoint.Address, endPoint.Port ).ConfigureAwait(false);
Byte[] reusableBuffer = new Byte[ 4096 ];
using( NetworkStream ns = tcp.GetStream() )
{
state.ConnectedTcs.SetResult( null );
Task<Int32> nsReadTask = null;
while( tcp.Connected )
{
if( !state.writeQueue.IsEmpty )
{
await WriteMessagesAsync( ... ).ConfigureAwait( false );
}
if( ns.DataAvailable )
{
await ReadMessagesAsync( ... ).ConfigureAwait( false );
}
// Wait for new data to arrive from remote host or for new messages to send:
if( state.OutgoingMessageTcs == null ) state.OutgoingMessageTcs = new TaskCompletionSource<Object>();
if( nsReadTask == null ) nsReadTask = ns.ReadAsync( reusableBuffer, 0, 0 ).ConfigureAwait( false );
Task c = await Task.WhenAny( state.OutgoingMessageTcs, nsReadTask ).ConfigureAwait( false );
if( c == state.OutgoingMessageTcs.Task ) state.OutgoingMessageTcs = null;
else if( c == nsReadTask ) nsReadTask = null;
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
像这样使用:
public async Task Main(String[] args)
{
PublicState state = new PublicState();
Task clientTask = Client.RunAsync( new IPEndPoint(args[0]), state );
await state.ConnectedTcs.Task; // awaits until TCP connection is established
state.EnqueueMessage( new Message("foo") );
state.EnqueueMessage( new Message("bar") );
state.EnqueueMessage( new Message("baz") );
await clientTask; // awaits until the TCP connection is closed
}
Run Code Online (Sandbox Code Playgroud)
这段代码可以工作,但我不喜欢它:感觉就像我正在使用TaskCompletionSource它代表一个实际的任务或某种背景操作,而我真的使用它TaskCompletionSource作为一种廉价EventWaitHandle.我没有使用,EventWaitHandle因为它IDisposable(我不想冒泄漏原生资源的风险),它缺乏一个WaitAsync或WaitOneAsync方法.我可以使用SemaphoreSlim(这是等待的,但包装一个EventWaitHandle)但我的代码并不真正代表一个信号量的良好使用.
我是否使用了TaskCompletionSource<T>可接受的,或者是否有更好的方法在RunAsync添加项目时"取消等待"执行OutgoingMessageQueue?
我觉得它"错误"的另一个原因是TaskCompletionSource<T>只能使用一次,然后需要更换.我渴望避免无关的分配.
如果我理解正确的话 - TPLBufferBlock可能就是您所需要的。当前的模拟量Enqueue为Post,可以通过ReceiveAsync扩展方法接收下一个值。
所以BufferBlock你的代码变成这样:
class PublicState {
internal readonly BufferBlock<Message> OutgoingMessageQueue = new BufferBlock<Message>();
internal readonly TaskCompletionSource<Object> ConnectedTcs = new TaskCompletionSource<Object>();
public void EnqueueMessage(Message message) {
this.OutgoingMessageQueue.Post(message);
}
}
static async Task RunAsync(IPEndPoint endPoint, PublicState state) {
using (TcpClient tcp = new TcpClient()) {
await tcp.ConnectAsync(endPoint.Address, endPoint.Port).ConfigureAwait(false);
Byte[] reusableBuffer = new Byte[4096];
using (NetworkStream ns = tcp.GetStream()) {
state.ConnectedTcs.SetResult(null);
Task<Int32> nsReadTask = null;
Task<Message> newMessageTask = null;
while (tcp.Connected) {
// Wait for new data to arrive from remote host or for new messages to send:
if (nsReadTask == null)
nsReadTask = ns.ReadAsync(reusableBuffer, 0, 0);
if (newMessageTask == null)
newMessageTask = state.OutgoingMessageQueue.ReceiveAsync();
var completed = await Task.WhenAny(nsReadTask, newMessageTask).ConfigureAwait(false);
if (completed == newMessageTask) {
var result = await newMessageTask;
// do stuff
newMessageTask = null;
}
else {
var bytesRead = await nsReadTask;
nsReadTask = null;
}
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
OutgoingMessageTcs作为奖励,这个版本(我认为)是线程安全的,而您当前的版本不是,因为您正在使用潜在的多个线程(调用者的线程RunAsync和线程)执行非线程安全的操作EnqueueMessages。
如果由于某种原因您不喜欢BufferBlock- 您可以以完全相同的方式使用 nuget 包AsyncCollection。Nito.AsyncEx初始化变为:
internal readonly AsyncCollection<Message> OutgoingMessageQueue = new AsyncCollection<Message>(new ConcurrentQueue<Message>());
Run Code Online (Sandbox Code Playgroud)
并获取:
if (newMessageTask == null)
newMessageTask = state.OutgoingMessageQueue.TakeAsync();
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
290 次 |
| 最近记录: |