use*_*154 5 c# multithreading asynchronous servicebus context-switch
我正在Azure上运行Service Bus,每秒大约需要10-100条消息.
最近我转而使用.net 4.5并且所有兴奋重构的所有代码都在每行中至少有两次'async'和'await ',以确保它'正确'完成':)
现在我想知道它实际上是好还是坏.如果你能看一下代码片段,请告诉我你的想法.我特别担心,如果线程上下文切换没有给我带来更多的悲伤而不是从所有的异步中获益......(看看!dumpheap肯定是一个因素)
只是一些描述 - 我将发布2个方法 - 一个在ConcurrentQueue上执行while循环,等待新消息和另一个一次发送一个消息的方法.我也正在使用Transient Fault Handling块,正如Azure博士所规定的那样.
发送循环(从头开始,等待新消息):
private async void SendingLoop()
{
try
{
await this.RecreateMessageFactory();
this.loopSemaphore.Reset();
Buffer<SendMessage> message = null;
while (true)
{
if (this.cancel.Token.IsCancellationRequested)
{
break;
}
this.semaphore.WaitOne();
if (this.cancel.Token.IsCancellationRequested)
{
break;
}
while (this.queue.TryDequeue(out message))
{
try
{
using (message)
{
//only take send the latest message
if (!this.queue.IsEmpty)
{
this.Log.Debug("Skipping qeued message, Topic: " + message.Value.Topic);
continue;
}
else
{
if (this.Topic == null || this.Topic.Path != message.Value.Topic)
await this.EnsureTopicExists(message.Value.Topic, this.cancel.Token);
if (this.cancel.Token.IsCancellationRequested)
break;
await this.SendMessage(message, this.cancel.Token);
}
}
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
ex.LogError();
}
}
}
}
catch (OperationCanceledException)
{ }
catch (Exception ex)
{
ex.LogError();
}
finally
{
if (this.loopSemaphore != null)
this.loopSemaphore.Set();
}
}
Run Code Online (Sandbox Code Playgroud)
发送消息:
private async Task SendMessage(Buffer<SendMessage> message, CancellationToken cancellationToken)
{
//this.Log.Debug("MessageBroadcaster.SendMessage to " + this.GetTopic());
bool entityNotFound = false;
if (this.MessageSender.IsClosed)
{
//this.Log.Debug("MessageBroadcaster.SendMessage MessageSender closed, recreating " + this.GetTopic());
await this.EnsureMessageSender(cancellationToken);
}
try
{
await this.sendMessageRetryPolicy.ExecuteAsync(async () =>
{
message.Value.Body.Seek(0, SeekOrigin.Begin);
using (var msg = new BrokeredMessage(message.Value.Body, false))
{
await Task.Factory.FromAsync(this.MessageSender.BeginSend, this.MessageSender.EndSend, msg, null);
}
}, cancellationToken);
}
catch (MessagingEntityNotFoundException)
{
entityNotFound = true;
}
catch (OperationCanceledException)
{ }
catch (ObjectDisposedException)
{ }
catch (Exception ex)
{
ex.LogError();
}
if (entityNotFound)
{
if (!cancellationToken.IsCancellationRequested)
{
await this.EnsureTopicExists(message.Value.Topic, cancellationToken);
}
}
}
Run Code Online (Sandbox Code Playgroud)
上面的代码来自发送1个消息/秒的'Sender'类.我在任何给定的时间都有大约50-100个实例运行,所以它可能是相当多的线程.
顺便说一下,不要担心EnsureMessageSender,RecreateMessageFactory,EnsureTopicExists太多,它们不经常被调用.
如果只需要一个后台线程处理消息队列并同步发送消息,我是不是更好,只要我需要的是一次发送一条消息,不用担心异步的东西并避免它带来的开销.
请注意,通常将一条消息发送到Azure Service Bus只需几毫秒,但这并不贵.(除非它很慢,超时或Service Bus后端出现问题,它可能会暂停一段时间尝试发送内容).
谢谢和抱歉长篇文章,
斯特沃
提出的解决方案
这个例子可以解决我的情况吗?
static void Main(string[] args)
{
var broadcaster = new BufferBlock<int>(); //queue
var cancel = new CancellationTokenSource();
var run = Task.Run(async () =>
{
try
{
while (true)
{
//check if we are not finished
if (cancel.IsCancellationRequested)
break;
//async wait until a value is available
var val = await broadcaster.ReceiveAsync(cancel.Token).ConfigureAwait(false);
int next = 0;
//greedy - eat up and ignore all the values but last
while (broadcaster.TryReceive(out next))
{
Console.WriteLine("Skipping " + val);
val = next;
}
//check if we are not finished
if (cancel.IsCancellationRequested)
break;
Console.WriteLine("Sending " + val);
//simulate sending delay
await Task.Delay(1000).ConfigureAwait(false);
Console.WriteLine("Value sent " + val);
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}, cancel.Token);
//simulate sending messages. One every 200mls
for (int i = 0; i < 20; i++)
{
Console.WriteLine("Broadcasting " + i);
broadcaster.Post(i);
Thread.Sleep(200);
}
cancel.Cancel();
run.Wait();
}
Run Code Online (Sandbox Code Playgroud)
你说:
上面的代码来自发送1个消息/秒的'Sender'类.我在任何给定的时间都有大约50-100个实例运行,所以它可能是相当多的线程.
这是异步的一个很好的例子.你在这里节省了很多线程.Async 减少了上下文切换,因为它不是基于线程的.在需要等待的情况下,它不会进行上下文切换.相反,下一个工作项正在同一个线程上处理(如果有的话).
因此,异步解决方案肯定会比同步解决方案更好.是否需要测量在工作流程的50-100个实例中实际使用较少的CPU.实例越多,异步更快的概率就越高.
现在,实现存在一个问题:您正在使用ConcurrentQueue
不是异步准备的.因此,即使在异步版本中,您实际上也使用了50-100个线程.它们要么阻塞(你想要避免),要么忙 - 等待燃烧100%CPU(在你的实现中似乎就是这种情况!).您需要摆脱这个问题并使排队异步.也许这SemaphoreSlim
是一种帮助,因为它可以异步等待.
归档时间: |
|
查看次数: |
4557 次 |
最近记录: |