我们有一个服务从n个消息队列接收消息.但是,如果重新启动消息队列服务,即使消息队列服务已成功重新启动,消息检索服务也会停止接收消息.
我试图专门捕获消息检索服务中抛出的MessageQueueException并再次调用队列的BeginReceive方法.但是,在消息队列服务重新启动的2秒左右,我得到大约1875个异常实例,然后当我们的StartListening方法中抛出另一个MessageQueueException时,服务停止运行.
有没有一种优雅的方法从消息队列服务重新启动恢复?
private void OnReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
{
MessageQueue queue = (MessageQueue)sender;
try
{
Message message = queue.EndReceive(e.AsyncResult);
this.StartListening(queue);
if (this.MessageReceived != null)
this.MessageReceived(this, new MessageReceivedEventArgs(message));
}
catch (MessageQueueException)
{
LogUtility.LogError(String.Format(CultureInfo.InvariantCulture, StringResource.LogMessage_QueueManager_MessageQueueException, queue.MachineName, queue.QueueName, queue.Path));
this.StartListening(queue);
}
}
public void StartListening(MessageQueue queue)
{
queue.BeginReceive();
}
Run Code Online (Sandbox Code Playgroud)
我需要处理这个导致的无限循环问题并清理一下但是你明白了.
发生MessageQueueException时,调用RecoverQueue方法.
private void RecoverQueue(MessageQueue queue)
{
string queuePath = queue.Path;
bool queueRecovered = false;
while (!queueRecovered)
{
try
{
this.StopListening(queue);
queue.Close();
queue.Dispose();
Thread.Sleep(2000);
MessageQueue newQueue = this.CreateQueue(queuePath);
newQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(this.OnReceiveCompleted);
this.StartListening(newQueue);
LogUtility.LogInformation(String.Format(CultureInfo.InvariantCulture, "Message queue {0} recovered successfully.", newQueue.QueueName));
queueRecovered = true;
}
catch (Exception ex)
{
LogUtility.LogError(String.Format(CultureInfo.InvariantCulture, "The following error occurred while trying to recover queue: {0} error: {1}", queue.QueueName, ex.Message));
}
}
}
public void StopListening(MessageQueue queue)
{
queue.ReceiveCompleted -= new ReceiveCompletedEventHandler(this.OnReceiveCompleted);
}
Run Code Online (Sandbox Code Playgroud)
在收到服务重新启动的结果异常后,您必须释放旧的MessageQueue,即取消对您的ReceiveCompleted事件的处理,处理MessageQueue等等.然后在新实例上再次创建一个新实例MessageQueue并挂接到该ReceiveCompleted事件MessageQueue.
或者,您可以使用轮询方法在特定时间间隔内创建新实例,调用MessageQueue.Receive(TimeSpan),等待传入消息或直到超时发生.在这种情况下,您处理消息并销毁MessageQueue实例并再次启动迭代.
通过MessageQueue每次重新创建,您可以确保内置恢复.此外,MessageQueue由于底层队列的内部缓存,创建的开销很小.
伪代码...
while (!notDone)// or use a timer or periodic task of some sort...
{
try
{
using (MessageQueue queue = new MessageQueue(queuePath))
{
Message message = queue.Receive(TimeSpan.FromMilliseconds(500));
// process message
}
}
catch (MessageQueueException ex)
{
// handle exceptions
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2976 次 |
| 最近记录: |