为什么我的流被同行关闭?

fre*_*tje 0 .net c# stomp nms apache-nms

我有以下控制台程序,用于侦听 ActiveMQ Stomp 服务器上的目标(队列或主题,这并不重要),并简单地将收到的消息记录到控制台:

\n\n
using System;\nusing Apache.NMS.Stomp;\nusing Apache.NMS;\nusing Apache.NMS.Util;\n\nnamespace StompTest\n{\n    class Program\n    {\n        static void Main(string[] args)\n        {\n            try\n            {\n                var connectionFactory = new ConnectionFactory("stomp:tcp://mybroker:61613");\n\n                var connection = connectionFactory.CreateConnection();\n                connection.ExceptionListener += new ExceptionListener(connection_ExceptionListener);\n                connection.Start();\n\n                var session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);\n\n                IDestination dest = SessionUtil.GetDestination(session, "queue://MyQueue");\n\n                var consumer = session.CreateConsumer(dest);\n                consumer.Listener += new MessageListener(consumer_Listener);\n\n                Console.ReadKey();\n            }\n            catch (NMSException ex)\n            {\n                Console.WriteLine("NMSException !! ==> " + ex.Message);\n            }\n        }\n\n        static void connection_ExceptionListener(Exception exception)\n        {\n            Console.WriteLine("Exception!! ==> " + exception.ToString());\n        }\n\n        static void consumer_Listener(IMessage message)\n        {\n            var textMessage = message as ITextMessage;\n            if (textMessage == null)\n                Console.WriteLine("No ITextMessage...");\n            else\n                Console.WriteLine("Received => " + textMessage.Text);\n        }\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

当我启动它时,只要我继续发送消息,它就可以正常工作。但是当有 30 秒不活动时,我会遇到异常。即使不向队列发送消息也会发生这种情况:

\n\n\n\n
using System;\nusing Apache.NMS.Stomp;\nusing Apache.NMS;\nusing Apache.NMS.Util;\n\nnamespace StompTest\n{\n    class Program\n    {\n        static void Main(string[] args)\n        {\n            try\n            {\n                var connectionFactory = new ConnectionFactory("stomp:tcp://mybroker:61613");\n\n                var connection = connectionFactory.CreateConnection();\n                connection.ExceptionListener += new ExceptionListener(connection_ExceptionListener);\n                connection.Start();\n\n                var session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);\n\n                IDestination dest = SessionUtil.GetDestination(session, "queue://MyQueue");\n\n                var consumer = session.CreateConsumer(dest);\n                consumer.Listener += new MessageListener(consumer_Listener);\n\n                Console.ReadKey();\n            }\n            catch (NMSException ex)\n            {\n                Console.WriteLine("NMSException !! ==> " + ex.Message);\n            }\n        }\n\n        static void connection_ExceptionListener(Exception exception)\n        {\n            Console.WriteLine("Exception!! ==> " + exception.ToString());\n        }\n\n        static void consumer_Listener(IMessage message)\n        {\n            var textMessage = message as ITextMessage;\n            if (textMessage == null)\n                Console.WriteLine("No ITextMessage...");\n            else\n                Console.WriteLine("Received => " + textMessage.Text);\n        }\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

通过谷歌搜索并阅读 Apache.NMS 源代码,我发现它必须对 \xc2\xb4InactivityMonitor\xc2\xb4 做一些事情,显然,当我在连接字符串上指定参数时,transport.useInactivityMonitor=false我不这样做得到异常,一切都运行良好。

\n\n

但据我认为我了解一切,inactivityMonitor 有一个目的:确保检测到“死”连接并正确清理。

\n\n

所以一定还有其他问题!我在上面的代码中添加了一个简单的控制台跟踪器,这是不带参数连接时的输出transport.useInactivityMonitor=false

\n\n
Exception!! ==> Apache.NMS.Stomp.IOException: Peer closed the stream.\n   at Apache.NMS.Stomp.Protocol.StompFrame.ReadLine(BinaryReader dataIn) in c:\\d\nev\\NMS.Stomp\\src\\main\\csharp\\Protocol\\StompFrame.cs:line 284\n   at Apache.NMS.Stomp.Protocol.StompFrame.ReadCommandHeader(BinaryReader dataIn\n) in c:\\dev\\NMS.Stomp\\src\\main\\csharp\\Protocol\\StompFrame.cs:line 208\n   at Apache.NMS.Stomp.Protocol.StompFrame.FromStream(BinaryReader dataIn) in c:\n\\dev\\NMS.Stomp\\src\\main\\csharp\\Protocol\\StompFrame.cs:line 197\n   at Apache.NMS.Stomp.Protocol.StompWireFormat.Unmarshal(BinaryReader dataIn) i\nn c:\\dev\\NMS.Stomp\\src\\main\\csharp\\Protocol\\StompWireFormat.cs:line 121\n   at Apache.NMS.Stomp.Transport.Tcp.TcpTransport.ReadLoop() in c:\\dev\\NMS.Stomp\n\\src\\main\\csharp\\Transport\\Tcp\\TcpTransport.cs:line 279\n
Run Code Online (Sandbox Code Playgroud)\n\n

所以我认为我的客户因为太长时间不活跃而被经纪人“踢”了。但我真的不明白为什么,从上面的日志来看,我的客户端实际上应该发送“keepalive”消息。所以它不应该处于非活动状态。

\n\n

对于如何继续解决这个问题,我别无选择。如果有人对此有一些见解,我们将不胜感激!

\n\n

更新
\n版本号:

\n\n
    \n
  • 库:Apache.NMS.Stomp v1.5.3
  • \n
  • 代理:Apache ActiveMQ 5.6.0
  • \n
\n

小智 5

尝试使用 stomp故障转移传输。所以而不是...

var connectionFactory = new ConnectionFactory("stomp:tcp://mybroker:61613");
Run Code Online (Sandbox Code Playgroud)

使用...

var connectionFactory = new ConnectionFactory("failover:tcp://mybroker:61613");
Run Code Online (Sandbox Code Playgroud)

现在,传输将自动重新连接,而不是引发 ExceptionListener 事件并由您来解决。

仅供参考:如果您需要在发生断开/重新连接时收到通知,则会引发 Connection 对象上的 ConnectionInterruptedListener 和 ConnectionResumedListener 事件。

还:

我建议使用 Session.CreateDurableConsumer 而不仅仅是 Session.CreateConsumer。这样您在断开/重新连接期间就不会丢失任何消息。

希望这对某人有帮助。