2008R2上消息队列的正确使用

ddj*_*min 5 c# msmq message-queue windows-server-2008-r2

我不是程序员,但我试图通过给他们一些指导来帮助他们。我们不再拥有任何有关 msmq 的内部专业知识。我们正在尝试使用它来将一些功能与调度应用程序集成。

调度应用程序通过使用自定义构建的 dll 进行网络调用来启动作业。dll 调用 weburl。Web 应用程序将运行其任务并向网站发送有关其执行的任务的更新。网站将消息写入队列。调用该站点的 dll 正在监视队列中是否有带有分配给该作业的标签的消息。当它收到最终状态消息时,它会关闭。

我们每隔几个小时就会收到以下消息。我们每小时运行近 100 个使用此方法的作业。在底部列出的代码中,jobid 对应于消息队列中消息的标签。每个作业在开始时都会发出一个 jobid,并将使用它作为发送到该作业的 msmq 的每条消息的标签。

 System.Messaging.MessageQueueException (0x80004005): Message that the cursor is currently pointing to has been removed from the queue by another process or by another call to Receive without the use of this cursor.
  at System.Messaging.MessageQueue.ReceiveCurrent(TimeSpan timeout, Int32 action, CursorHandle cursor, MessagePropertyFilter filter, MessageQueueTransaction internalTransaction, MessageQueueTransactionType transactionType)
  at System.Messaging.MessageEnumerator.get_Current() 
Run Code Online (Sandbox Code Playgroud)

这是它的代码。

  while ( running )
        {
            // System.Console.WriteLine( "Begin Peek" );
            messageQueue.Peek();
            //System.Console.WriteLine( "End Peek" );
            messageQueue.MessageReadPropertyFilter.SetAll();

            using ( MessageEnumerator enumerator = messageQueue.GetMessageEnumerator2() )
            {
                enumerator.Reset();

                while ( enumerator.MoveNext() )
                {
                    Message msg = enumerator.Current;

                    if ( msg.Label.Equals( this.jobid ) )
                    {
                        StringBuilder sb = new StringBuilder();
                        /*
                        try
                        {
                            sb.Append( "Message Source: " );
                            //sb.Append( msg.SourceMachine );
                            sb.Append( " Sent: " );
                            sb.Append( msg.SentTime );
                            sb.Append( " Label " );
                            sb.Append( msg.Label );
                            sb.Append( " ID: " );
                            sb.Append( msg.Id );
                            sb.Append( " CorrelationID: " );
                            sb.Append( msg.CorrelationId );
                            sb.Append( " Body Type: " );
                            sb.Append( msg.BodyType );
                        }
                        catch ( Exception )
                        {
                            throw;
                        }
                        finally
                        {
                            System.Console.WriteLine( sb.ToString() );
                        }
                        */
                        //System.Console.WriteLine( "Receiving Message started" );
                        using ( Message message = messageQueue.ReceiveById( msg.Id ) )
                        {
                            //System.Console.WriteLine( "Receiving Message Complete" );
                            //sb = new StringBuilder();
                            string bodyText = string.Empty;

                            try
                            {
                                System.IO.StringWriter sw = new System.IO.StringWriter( sb );
                                System.IO.StreamReader sr = new System.IO.StreamReader( message.BodyStream );

                                while ( !sr.EndOfStream )
                                {
                                    sw.WriteLine( sr.ReadLine() );
                                }
                                sr.Close();
                                sw.Close();
                                bodyText = ( string ) FromXml( sb.ToString(), typeof( string ) );
                                int indx = bodyText.IndexOf( ',' );
                                string tokens = bodyText.Substring( indx + 1 );
                                indx = tokens.IndexOf( ',' );
                                string command = tokens.Substring( 0, indx );
                                tokens = tokens.Substring( indx + 1 );
                                if ( command.Equals( COMMAND_STARTED ) )
                                {
                                    System.Console.WriteLine( "STARTED " + tokens );
                                }
                                else if ( command.Equals( COMMAND_UPDATE ) )
                                {
                                    System.Console.WriteLine( tokens );
                                }
                                else if ( command.Equals( COMMAND_ENDED_OK ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Success" );
                                    finalResults = new FinalResults( 0, 0, "Success" );
                                    running = false;
                                }
                                else if ( command.Equals( COMMAND_ENDED_WARNING ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Warning Issued" );
                                    finalResults = new FinalResults( 1, 1, "Warning" );
                                    running = false;
                                }
                                else if ( command.Equals( COMMAND_ENDED_FAIL ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Failure" );
                                    finalResults = new FinalResults( 2, 16, "Failure" );
                                    running = false;
                                }
                            }
                            catch ( Exception )
                            {
                                throw;
                            }
                            finally
                            {
                                //System.Console.WriteLine( "Body: " + bodyText );
                            }
                        }
                    }
                }
            }
        }

        return finalResults;
    }

    MessageQueue messageQueue = null;
    string webServiceURL = "";
    Dictionary<string, string> parms = new Dictionary<string, string>();
    string jobid = "NONE";
Run Code Online (Sandbox Code Playgroud)

Nar*_*aen 3

kprobst 的解释很可能就是正在发生的事情。即使您看到队列中存在此特定消息,如果不同的应用程序(或同一应用程序的不同实例)从此队列中选取一条(任何)消息,也会使光标无效。

从本质上讲,如果多个进程从同一队列中获取数据,则此代码无法正常工作。