如何获取Websphere MQ连接状态以及如何重置连接:

Sam*_*Sam 8 .net c# websphere mq ibm-mq

1.)从.net客户端,我如何测试客户端是否连接到服务器(即可以发送和接收)是的,我可以在try块内发送消息并捕获随后的异常,但我希望更优雅的解决方案.

2)如何打开,关闭和重新打开连接?在我尝试解决上面的问题1时,我发现如果我打开连接然后调用connection.Close()我无法从连接工厂获得另一个连接(请参阅下面的代码片段).我收到错误消息XMSCC0008

我使用的是非常标准的vanilla MQ配置.以下是我的客户端连接方式:

ISession session = MQAccess.GetSession(MQAccess.Connection);
IDestination destination = session.CreateTopic(SubTopicName);
Consumer = MQAccess.GetConsumer(session, destination);
Consumer.MessageListener = new MessageListener(HandleMQSubEvent);
MQAccess.Connection.Start();
Run Code Online (Sandbox Code Playgroud)

其中MQAccess是一个小实用程序类.

编辑了添加MQAccess代码的问题:

public static class MQAccess
{
    public static readonly MQConfigurationSectionHandler ConfigSettings;
    public static readonly IConnectionFactory ConnectionFactory;

    private static readonly IConnection connection;
    public static IConnection Connection
    {
        get { return connection; }
    }

    static MQAccess()
    {
        ConfigSettings = (MQConfigurationSectionHandler)
            ConfigurationManager.GetSection("mq-configuration");

        XMSFactoryFactory factory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ);
        ConnectionFactory = factory.CreateConnectionFactory();
        ConnectionFactory.SetStringProperty(XMSC.WMQ_HOST_NAME, ConfigSettings.Hostname);
        ConnectionFactory.SetIntProperty(XMSC.WMQ_PORT, ConfigSettings.Port);
        ConnectionFactory.SetStringProperty(XMSC.WMQ_CHANNEL, ConfigSettings.Channel);

        if (ConfigSettings.QueueManager == string.Empty)
        {
            ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, "");
        }
        else
        {
            ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, ConfigSettings.QueueManager);
        }

        connection = GetConnection();
    }

    public static IConnection GetConnection()
    {
        return ConnectionFactory.CreateConnection();
    }

    public static ISession GetSession(IConnection connection)
    {
        return connection.CreateSession(false, AcknowledgeMode.AutoAcknowledge);
    }

    public static IMessageProducer GetProducer(ISession session, IDestination destination)
    {
        return session.CreateProducer(destination);
    }

    public static IMessageConsumer GetConsumer(ISession session, IDestination destination)
    {
        return session.CreateConsumer(destination);
    }

    public static void MQPub(string TopicURI, string message)
    {
        using (var session = GetSession(Connection))
        {
            using (var destination = session.CreateTopic(TopicURI))
            {
                using (var producer = GetProducer(session, destination))
                {
                    producer.Send(session.CreateTextMessage(message));
                }
            }
        }
    }

    public static void MQPub(string TopicURI, IEnumerable<string> messages)
    {
        using (var session = GetSession(Connection))
        {
            using (var destination = session.CreateTopic(TopicURI))
            {
                using (var producer = GetProducer(session, destination))
                {
                    foreach (var message in messages)
                    {
                        producer.Send(session.CreateTextMessage(message));
                    }
                }
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

编辑:将MQAccess类重命名为MQClient.根据T Rob建议将其设为实例类.断开连接方法仍然与上面列出的错误消息崩溃

public class MQClient : IDisposable
{
    public MQConfigurationSectionHandler ConfigSettings { get; private set; }
    public IConnectionFactory ConnectionFactory { get; private set; }

    public IConnection Connection { get; private set;  }

    public IMessageConsumer Consumer { get; private set; }
    public IMessageProducer Producer { get; private set; }
    // Save sessions as fields for disposing and future subscription functionality
    private ISession ProducerSession;
    private ISession ConsumerSession;
    public string SubTopicName { get; private set; }
    public string PubTopicName { get; private set; }
    public bool IsConnected { get; private set; }
    public event Action<Exception> ConnectionError;
    private Action<IMessage> IncomingMessageHandler;

    public MQClient(string subTopicName, string pubTopicName, Action<IMessage> incomingMessageHandler)
    {
        // Dont put connect logic in the constructor.  If we lose the connection we may need to connect again.
        SubTopicName = subTopicName;
        PubTopicName = pubTopicName;
        IncomingMessageHandler = incomingMessageHandler;
    }

    public string Connect()
    {
        IsConnected = false;
        string errorMsg = string.Empty;

        ConfigSettings = (MQConfigurationSectionHandler)
                ConfigurationManager.GetSection("mq-configuration");

        XMSFactoryFactory factory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ);
        ConnectionFactory = factory.CreateConnectionFactory();
        ConnectionFactory.SetStringProperty(XMSC.WMQ_HOST_NAME, ConfigSettings.Hostname);
        ConnectionFactory.SetIntProperty(XMSC.WMQ_PORT, ConfigSettings.Port);
        ConnectionFactory.SetStringProperty(XMSC.WMQ_CHANNEL, ConfigSettings.Channel);

        if (ConfigSettings.QueueManager == string.Empty)
            ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, "");
        else
            ConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, ConfigSettings.QueueManager);

        Connection = ConnectionFactory.CreateConnection();


        if (!string.IsNullOrEmpty(PubTopicName))
        {
            ProducerSession = Connection.CreateSession(false, AcknowledgeMode.AutoAcknowledge);
            Producer = ProducerSession.CreateProducer(ProducerSession.CreateTopic(PubTopicName));
        }

        if (!string.IsNullOrEmpty(SubTopicName) && IncomingMessageHandler != null)
        {
            ConsumerSession = Connection.CreateSession(false, AcknowledgeMode.AutoAcknowledge);
            Consumer = ConsumerSession.CreateConsumer(ConsumerSession.CreateTopic(SubTopicName));
            Consumer.MessageListener = new MessageListener(IncomingMessageHandler);
        }

        try
        {
            Connection.Start();
            Connection.ExceptionListener = new ExceptionListener(ConnectionExceptionHandler);
            IsConnected = true;
        }
        catch (TypeInitializationException ex)
        {
            errorMsg = "A TypeInitializationException error occured while attempting to connect to MQ.  Check the Queue configuration in App.config. The error message is: " + ex.Message; 
        }
        catch (IllegalStateException ex)
        {
            errorMsg = "An IllegalStateException error occured while attempting to connect to MQ.  Check the Queue configuration in App.config. The error message is: " + ex.Message; 
        }

        return errorMsg;
    }

    public void Disconnect()
    {
        if (Producer != null)
        {
            Producer.Close();
            Producer.Dispose();
            Producer = null;
        }

        if (ProducerSession != null)
        {
            // Call Unsubscribe here if subscription is durable

            ProducerSession.Close();
            ProducerSession.Dispose();
            ProducerSession = null;
        }

        if (Connection != null)
        {
            Connection.Stop();

            //if (Connection.ExceptionListener != null)
            //    Connection.ExceptionListener = null;

            // Per Shashi............
            //if (Consumer.MessageListener != null)
            //    Consumer.MessageListener = null;

            Connection.Close();
            Connection.Dispose();
            Connection = null;
        }

        if (Consumer != null)
        {

            if (Consumer.MessageListener != null)
                Consumer.MessageListener = null;

            Consumer.Close();
            Consumer.Dispose();
            Consumer = null;
        }


        if (ConsumerSession != null)
        {
            // Call Unsubscribe here if subscription is durable
            ConsumerSession.Close();
            ConsumerSession.Dispose();
            ConsumerSession = null;
        }

        IsConnected = false;
    }


    public void Publish(string message)
    {
        Producer.Send(ProducerSession.CreateTextMessage(message));
    }


    public void Publish(string[] messages)
    {
        foreach (string msg in messages)
            Publish(msg);
    }

    public void ConnectionExceptionHandler(Exception ex)
    {
        Disconnect(); // Clean up

        if (ConnectionError != null)
            ConnectionError(ex);
    }

    #region IDisposable Members
    private bool disposed;

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (!this.disposed)
        {
            if (disposing)
                Disconnect();

            disposed = true;
        }
    }
    #endregion

}
Run Code Online (Sandbox Code Playgroud)

T.R*_*Rob 8

问题出在这里 - > where MQAccess is a small utility class.

问题的第一部分询问如何判断连接是否处于活动状态.WebSphere MQ的XMS类是非Java平台的JMS规范的实现.它们非常密切地遵循JMS规范,并且JMS规范在连接或会话上没有相应的方法,isConnected因此XMS也没有.但是,所有GET和PUT活动都应该在try/catch块中进行,以便捕获JMS异常.(从中总是打印出来linkedException,对吗?)当抛出JMS异常时,应用程序要么将其视为致命并死亡,要么关闭除连接工厂之外的所有JMS对象,等待几秒钟然后重新驱动连接序列.

更新基于问题中的新信息:
感谢发布MQAccess类.这提供了相当深入的了解正在发生的事情,尽管仍然没有任何代码显示连接关闭和重新打开的位置,按照问题的第2部分.

但是,代码显示该类在构造类实例时MQAccess创建一个私有实例,ICONNECTION connection然后将其公开公开MQAccess.GetConnection.MQAccess当前发布的类没有公共或私有类方法,它们将替换所持有的连接句柄,connection如果MQAccess.Connection.Close()被调用,IConnectionMQAccess类中的对象实例将在持有无效连接句柄后永远存在.连接关闭后,该实例MQAccess实际上已经死亡.您必须删除并重新实例化MQAccess才能获得新连接.

MQAccess类不公开暴露连接工厂因此理论上将有可能调用MQAccess.GetConnection从类的外部并获得有效的新IConnection对象,即使在关闭原来的后.但是,该实例将存在于MQAccess类的范围之外,因此任何后续调用都MQAccess将引用其已解除的实例变量,connection而不是在类外部创建的新连接实例.

如果需要关闭并重新创建连接,可以考虑从内部管理连接MQAccess.一种低技术方法可能是MQAccess.Close()为连接编写一种方法,它将关闭现有连接然后立即调用,connection = GetConnection();以便私有connection变量始终保持有效的连接句柄.

如果这不能解决问题,请发布正在关闭的代码并重新创建连接.

顺便说一句,通过网络连接的非事务会话打开了丢失或复制任何JMS提供程序(包括WMQ)的消息的可能性.这是你的意图吗?我已经解释了为什么这是在其他SO张贴在这里.


Sha*_*shi 5

添加T.Rob的评论.

问题1:
我希望您能够访问源代码MQAccess.如果是,您可以公开一个属性MQAccess,指示连接是否处于活动状态.如果您没有访问权限,则可能必须要求该类的作者添加此属性.您可以执行以下操作来设置/重置属性.

1)在createConnection方法成功返回后设置属性.
2)为连接设置Exception监听器.
3)重置异常处理程序中的属性.检查原因代码并重置属性,如果它是连接中断错误(XMSWMQ1107和链接的异常可以具有MQRC 2009).

问题2
如果您能告诉我们您的情况closingreopening关系,将会有所帮助.我建议关闭连接是:
1)首先做一个connection.Stop().
2)删除任何消息监听器,基本上做一个consumer.MessageListener = null.
3)然后执行connection.Close().
4)做一个连接= null

附加信息 以下是我用来测试的样本.

    private void OnException(Exception ex)
    {
        XMSException xmsex = (XMSException)ex;
        Console.WriteLine("Got exception");
        // Check the error code.
        if (xmsex.ErrorCode == "XMSWMQ1107")
        {
            Console.WriteLine("This is a connection broken error");
            stopProcessing = true; // This is a class member variable
        }
    }
Run Code Online (Sandbox Code Playgroud)

在创建连接的方法中,设置异常侦听器.

        // Create connection.
        connectionWMQ = cf.CreateConnection();
        connectionWMQ.ExceptionListener = new ExceptionListener(OnException);
Run Code Online (Sandbox Code Playgroud)

只要存在连接错误,就会调用异常监听器并将flag设置为true.

在不再需要对象时处置对象是一种很好的做法.有父子关系,Consumer,Producer等是Session的孩子,而Session又是Connection的孩子.因此处置的顺序可以是孩子优先和父母下一个.但如果处置父母,孩子也会自动处理.