Los*_*les 9 c# multithreading asynchronous rabbitmq task-parallel-library
我一直在努力与一些异步等待的东西.我正在使用RabbitMQ在某些程序之间发送/接收消息.
作为一个背景,RabbitMQ客户端使用我可以看到的3个左右的线程:一个连接线程和两个心跳线程.每当通过TCP接收消息时,连接线程就会处理它并调用我通过接口提供的回调.文档说最好避免在这个调用期间做很多工作,因为它在与连接相同的线程上完成并且事情需要继续.它们提供了一个QueueingBasicConsumer具有阻塞'Dequeue'方法的方法,该方法用于等待接收消息.
我希望我的消费者能够在这个等待时间内实际释放他们的线程上下文,以便其他人可以做一些工作,所以我决定使用async/await任务.我写了一个以下列方式AwaitableBasicConsumer使用TaskCompletionSources 的类:
我有一个等待的Dequeue方法:
public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
//we are enqueueing a TCS. This is a "read"
rwLock.EnterReadLock();
try
{
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs = new TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>();
//if we are cancelled before we finish, this will cause the tcs to become cancelled
cancellationToken.Register(() =>
{
tcs.TrySetCanceled();
});
//if there is something in the undelivered queue, the task will be immediately completed
//otherwise, we queue the task into deliveryTCS
if (!TryDeliverUndelivered(tcs))
deliveryTCS.Enqueue(tcs);
}
return tcs.Task;
}
finally
{
rwLock.ExitReadLock();
}
}
Run Code Online (Sandbox Code Playgroud)
rabbitmq客户端调用的回调完成任务:这是从AMQP Connection线程的上下文调用的
public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
//we want nothing added while we remove. We also block until everybody is done.
rwLock.EnterWriteLock();
try
{
RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
bool sent = false;
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs;
while (deliveryTCS.TryDequeue(out tcs))
{
//once we manage to actually set somebody's result, we are done with handling this
if (tcs.TrySetResult(e))
{
sent = true;
break;
}
}
//if nothing was sent, we queue up what we got so that somebody can get it later.
/**
* Without the rwlock, this logic would cause concurrency problems in the case where after the while block completes without sending, somebody enqueues themselves. They would get the
* next message and the person who enqueues after them would get the message received now. Locking prevents that from happening since nobody can add to the queue while we are
* doing our thing here.
*/
if (!sent)
{
undelivered.Enqueue(e);
}
}
finally
{
rwLock.ExitWriteLock();
}
}
Run Code Online (Sandbox Code Playgroud)
rwLock是一个ReaderWriterLockSlim.两个队列(deliveryTCS和undelivered)是ConcurrentQueues.
问题:
每隔一段时间,等待dequeue方法的方法会抛出异常.这通常不是问题,因为该方法也是async如此,因此它进入任务进入的"异常"完成状态.问题出现在DequeueAsyncRabbitMQ客户端创建的AMQP连接线程上等待之后恢复调用的任务的情况.通常我已经看到任务恢复到主线程或其中一个浮动的工作线程.但是,当它恢复到AMQP线程并抛出异常时,一切都会停止.该任务不会进入其"异常状态",并且AMQP Connection线程会说它正在执行发生异常的方法.
我在这里的主要困惑是为什么这不起作用:
var task = c.RunAsync(); //<-- This method awaits the DequeueAsync and throws an exception afterwards
ConsumerTaskState state = new ConsumerTaskState()
{
Connection = connection,
CancellationToken = cancellationToken
};
//if there is a problem, we execute our faulted method
//PROBLEM: If task fails when its resumed onto the AMQP thread, this method is never called
task.ContinueWith(this.OnFaulted, state, TaskContinuationOptions.OnlyOnFaulted);
Run Code Online (Sandbox Code Playgroud)
以下是RunAsync为测试设置的方法:
public async Task RunAsync()
{
using (var channel = this.Connection.CreateModel())
{
...
AwaitableBasicConsumer consumer = new AwaitableBasicConsumer(channel);
var result = consumer.DequeueAsync(this.CancellationToken);
//wait until we find something to eat
await result;
throw new NotImplementeException(); //<-- the test exception. Normally this causes OnFaulted to be called, but sometimes, it stalls
...
} //<-- This is where the debugger says the thread is sitting at when I find it in the stalled state
}
Run Code Online (Sandbox Code Playgroud)
阅读我所写的内容,我发现我可能没有很好地解释我的问题.如果需要澄清,请问.
我提出的解决方案如下:
TaskScheduler决定使用它们.如果我能找到一种方法告诉任务调度程序那些线程是不受限制的,那就太好了.有没有人解释为什么会发生这种情况或解决这个问题的任何建议?现在我正在删除异步代码,以便程序可靠,但我真的想了解这里发生了什么.
我首先建议你阅读我的async介绍,它准确地解释了如何await捕获上下文并使用它来恢复执行.总之,它会捕捉当前SynchronizationContext(或当前TaskScheduler如果SynchronizationContext.Current是null).
另一个重要的细节是async连续安排TaskContinuationOptions.ExecuteSynchronously(如@svick在评论中指出).我有一篇关于此的博文,但AFAIK在任何地方都没有正式记录.这个细节确实使编写async生产者/消费者队列变得困难.
原因await不是"切换回原始上下文"(可能)因为RabbitMQ线程没有SynchronizationContext或者TaskScheduler- 因此,在调用时直接执行延续,TrySetResult因为这些线程看起来就像常规线程池线程.
顺便说一下,通读你的代码,我怀疑你使用读写器锁和并发队列是不正确的.没有看到整个代码我无法确定,但那是我的印象.
我强烈建议你使用一个现有的async队列并围绕它构建一个消费者(换句话说,让其他人做困难的部分:).TPL数据流中的BufferBlock<T>类型可以充当队列; 如果您的平台上有Dataflow,那么这将是我的第一个建议.否则,我的AsyncEx库中有一个类型,或者你可以编写自己的类型(正如我在博客中描述的那样).asyncAsyncProducerConsumerQueue
这是一个使用示例BufferBlock<T>:
private readonly BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs> _queue = new BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs>();
public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
_queue.Post(e);
}
public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
return _queue.ReceiveAsync(cancellationToken);
}
Run Code Online (Sandbox Code Playgroud)
在这个例子中,我保留了你的DequeueAsyncAPI.但是,一旦开始使用TPL Dataflow,请考虑在其他地方使用它.当您需要这样的队列时,通常会发现代码的其他部分也会受益于数据流方法.例如,DequeueAsync您可以将自己链接BufferBlock到一个方法,而不是让一堆方法调用ActionBlock.
| 归档时间: |
|
| 查看次数: |
1355 次 |
| 最近记录: |