我想知道如何推迟使用Amqpphplib.
我用过这个很棒的咖啡脚本教程:
https://github.com/jamescarr/rabbitmq-scheduled-delivery
但它似乎不适用于PHP-amqplib.
消息按我的意愿到期,但似乎"x-dead-letter-exchange"不能完成这项工作.我使用了RabbitMQ管理控制台,我看到了所有队列的创建和删除.但是我的消息在到期后会立即进入队列.我使用RabbitMQ 3.2.3版本,PHP-amqplib 2.2.*版本.
这是我的代码:
连接类:
class Connection
{
/**
* @var $ch
*/
public $ch;
/**
* @var $consumer_tag
*/
public $consumer_tag;
/**
* @var $exchange
*/
public $exchange;
/**
* @var $conn
*/
public $conn;
public function __construct($host, $port, $user, $password, $vhost)
{
$this->exchange = 'immediate';
$this->queue = 'right.now.queue';
$this->consumer_tag = 'consumer';
$this->conn = new AMQPConnection($host, $port, $user, $password, $vhost);
$this->ch = $this->conn->channel();
$this->ch->exchange_declare($this->exchange, 'direct', false, true, false);
$this->ch->queue_declare($this->queue, false, true, false, false, false);
$this->ch->queue_bind($this->queue, …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用php-amqplib来发送和接收消息.它可以在终端上发送/接收.但是当进入Web浏览器时,无法从队列中接收它连续等待消息.我在下面的代码中使用了receive.php
require_once(__DIR__ . '/lib/php-amqplib/amqp.inc');
include_once(__DIR__ . '/config/config.php');
$connection = new AMQPConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();
$channel->queue_declare('test22');
$callback = function($msg){
echo $msg->body;
};
$channel->basic_consume('test22', 'consumer_tag', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
Run Code Online (Sandbox Code Playgroud)
如果我使用下面而不是回调函数但它不从队列中消耗,它会从队列中获取第一条消息
$abc=$channel->basic_get("test22", false, 2);
if(!empty($abc))
{
print_r($abc->body);
}
Run Code Online (Sandbox Code Playgroud)
这意味着消息在队列'test22'中可用.给我任何线索.
我们有一个运行RabbitMQ的测试,但不知何故达到一个"阈值",这使RabbitMQ变慢.
我们做了什么,我们设置了一个RabbitMQ服务器并制作了一个小的PHP脚本,它使用pecl-amqp连接(没有pconnect)到MQ服务器,发送1个随机消息并在此之后立即断开连接.我们将此过程分叉200次(单个php执行,因此没有线程),这意味着每秒+/- 2000个连接都会打开到MQ服务器并关闭.
之后,我在另一台服务器上启动相同的脚本来测量连接时间+读取时间.我们注意到客户端收到"Connection.Start"消息需要+/- 1.5秒.我为这个单个客户端添加了tcpdump的屏幕截图,向您展示了这种情况.我们看到的另一个问题是,有很多TCP无序和重传.
服务器规格
TCPdump http://i.imgur.com/rF1GsOy.png
sysctl设置:
fs.file-max = 11479756
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_fin_timeout = 10
net.ipv4.tcp_max_syn_backlog = 4096
net.ipv4.tcp_keepalive_intvl = 30
net.ipv4.tcp_keepalive_probes = 9
net.ipv4.tcp_keepalive_time = 7200
net.ipv4.conf.default.rp_filter = 0
net.ipv4.tcp_syncookies=1
net.core.somaxconn=2048
net.ipv4.ip_local_port_range = 10240 65535
net.nf_conntrack_max = 1000000
net.ipv4.tcp_tw_recycle = 1
net.ipv4.conf.all.arp_ignore = 1
net.ipv4.conf.all.arp_announce = 2
Run Code Online (Sandbox Code Playgroud)
rabbitmq.conf
%% -*- mode: erlang -*-
[{
rabbit,
[{
tcp_listeners, [{
"10.0.1.69", 5672
}, {
"::1", 5672
}]
},
{tcp_listen_options, [binary,
{packet, …Run Code Online (Sandbox Code Playgroud) 使用 RabbitMQ Web UI,当我发布到没有当前队列绑定的主题交换时,rabbitmq 表示消息已发布但尚未路由。
使用 amqp.node,当我将队列绑定到交换并开始使用“#”(全部)使用时,我没有得到任何东西。
我期待收到之前发布但未路由的消息。这可能吗?