我正在使用 RabbitMQ .net 客户端库以下列方式将消息发布到 RabbitMQ 节点。
var factory = new ConnectionFactory
{
HostName = "localhost",
Port = 5672,
UserName = "guest",
Password = "guest",
VirtualHost = @"/"
};
_conn = factory.CreateConnection();
_channel = _conn.CreateModel();
_properties = _channel.CreateBasicProperties();
Run Code Online (Sandbox Code Playgroud)
然后在循环中调用下面的
using (var memStream = new MemoryStream())
{
var formatter = new BinaryFormatter();
formatter.Serialize(memStream, message);
var properties = _channel.CreateBasicProperties();
properties.Priority = Convert.ToByte((int) priority);
_channel.BasicPublish(String.Empty, _routeKey, properties, memStream.ToArray());
}
Run Code Online (Sandbox Code Playgroud)
上面的代码在中等负载下运行良好,每秒大约有 50-100 条消息。但是当我增加每秒大约 500 条消息要发布的消息数量时,RabbitMQ 节点开始出现以下错误并断开通道连接。
Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=501, text="FRAME_ERROR - type 206, first 16 octets = <<31,0,60,0,40,0,0,6,115,101,110,115,111,114,16,115>>:
{invalid_frame_end_marker, 114}
", classId=0, methodId=0, cause=:
at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)
at RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, Byte[] body)
at RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, Boolean mandatory, Boolean immediate, IBasicProperties basicProperties, Byte[] body)
at RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, Boolean mandatory, IBasicProperties basicProperties, Byte[] body)
at RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, IBasicProperties basicProperties, Byte[] body)`
Run Code Online (Sandbox Code Playgroud)
BinaryFormatter 的消息大小约为 5kb。
| 归档时间: |
|
| 查看次数: |
2118 次 |
| 最近记录: |