我正在处理一个巨大的xml文档(其中包含大约一百万个条目),然后使用rabbitmq将格式化版本导入数据库.每次发布大约200,000个条目后,我收到管道错误,而rabbitmq无法从中恢复.
注意错误:fwrite():发送2651字节失败,errno = 11资源暂时不可用[/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc,line 439]
注意错误:fwrite():发送33个字节失败,errno = 104连接由[/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc,line 439]中的同行重置
注意错误:fwrite():在[/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc,line 439]的errno = 32 Broken pipe中发送19个字节失败
这随后导致节点关闭错误,并且需要手动终止该进程以从中恢复.
这些是我的班级方法: -
public function publishMessage($message) {
if (!isset($this->conn)) {
$this->_createNewConnectionAndChannel();
}
try {
$this->ch->basic_publish(
new AMQPMessage($message, array('content_type' => 'text/plain')),
$this->defaults['exchange']['name'],
$this->defaults['binding']['routing_key']
);
} catch (Exception $e) {
echo "Caught exception : " . $e->getMessage();
echo "Creating new connection.";
$this->_createNewConnectionAndChannel();
$this->publishMessage($message); // try again
}
}
protected function _createNewConnectionAndChannel() {
if (isset($this->conn)) {
$this->conn->close();
}
if(isset($this->ch)) {
$this->ch->close();
}
$this->conn = new AMQPConnection(
$this->defaults['connection']['host'],
$this->defaults['connection']['port'],
$this->defaults['connection']['user'],
$this->defaults['connection']['pass']
);
$this->ch = $this->conn->channel();
$this->ch->access_request($this->defaults['channel']['vhost'], false, false, true, true);
$this->ch->basic_qos(0 , 20 , 0); // fair dispatching
$this->ch->queue_declare(
$this->defaults['queue']['name'],
$this->defaults['queue']['passive'],
$this->defaults['queue']['durable'],
$this->defaults['queue']['exclusive'],
$this->defaults['queue']['auto_delete']
);
$this->ch->exchange_declare(
$this->defaults['exchange']['name'],
$this->defaults['exchange']['type'],
$this->defaults['exchange']['passive'],
$this->defaults['exchange']['durable'],
$this->defaults['exchange']['auto_delete']
);
$this->ch->queue_bind(
$this->defaults['queue']['name'],
$this->defaults['exchange']['name'],
$this->defaults['binding']['routing_key']
);
}
Run Code Online (Sandbox Code Playgroud)
任何帮助将不胜感激.
确保在Rabbit MQ上为用户添加了虚拟主机访问权限.我为"/"主机创建了新用户并忘记了设置访问权限,默认情况下使用该权限.
您可以通过管理面板yourhost执行此操作:15672>管理员>单击用户>查找"设置权限".
PS我假设您的RabbitMQ服务正在运行,用户存在且密码正确.