这是使用消息队列的正确方法吗?

Par*_*eog 15 php sockets linux message-queue zeromq

我是消息队列的新手,现在我ZeroMQ在我的Linux服务器上使用.我PHP用来写客户端和服务器.这主要用于处理推送通知.

正如他们所展示的那样,我在单个I/O线程实例上使用基本的REQ- REP形式 - 通信模式ZMQContext.

这是最小化的zeromqServer.php代码:

include("someFile.php");

$context = new ZMQContext(1);

//  Socket to talk to clients
$responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
$responder->bind("tcp://*:5555");

while (true) {
    $request = $responder->recv();
    printf ("Received request: [%s]\n", $request);

    //  -----------------------------------------------------------------
    //                                    Process push notifications here
    //
    sleep (1); 

    //  -----------------------------------------------------------------
    //                                          Send reply back to client
    $responder->send("Basic Reply");
}
Run Code Online (Sandbox Code Playgroud)

这是一个最小化的ZeroMQ 客户端:

$context = new ZMQContext();

//  Socket to talk to server
echo "Connecting to hello world server…\n";
$requester = new ZMQSocket($context, ZMQ::SOCKET_REQ);
$check = $requester->connect("tcp://localhost:5555");

var_dump($check);
$requester->send("json string payload with data required to process push notifications.");

//$reply = $requester->recv();
Run Code Online (Sandbox Code Playgroud)

那么,我做什么?我zeromqServer.php使用linux命令运行后台服务

nohup php zeromqServer.php &

这将它作为后台进程运行.现在,当客户端调用它时,它会执行所需的工作.

但问题是,每当任何文件发生变化时(包括文件中的include-ed zeromqServer),我都需要重新启动进程.

此外,不知何故,2-3天之后,它就会停止工作.这个过程不会停止,但它会停止工作.

我觉得它必须是一些套接字问题,也许套接字不再打开.那时我必须重启zeromqServer.php文件进程.

Q1: 可能是什么问题?

Q2: 什么是正确的方法呢?

use*_*197 5

这不是使用Messaging Queue的正确方法.

A1:问题是您的服务器最终必须阻止,因为客户端代码在REQ/REP-pattern要求时不检索任何答案.的zeromqServer.phpREP侧将简单地不试图recv()从一个相关联的客户的另一消息(在REQ正式通信构成形式的侧),直到所述客户端已经物理递送(到内部缓冲器),并已recv()-ed的从"回复" -message zeromqServer.php侧.

早期版本的ZeroMQ,ver.2.1等,用于在将数据复制到O/S内核之前,对节点的内部消息队列和内存管理进行无限制,无限的默认限制,用于低级I/O线程缓冲资源并从ZeroMQ内存占用中释放.

较新版本的ZeroMQ,版本3.x +,默认情况下具有所谓的HWM -is(又名High-Water-Mark-s)"仅"1000条消息"短",之后相应的ZeroMQ资源启动阻止或删除邮件.

虽然明确增加HWM设置管理的反应性尝试看起来像是解决主要设计错误的一种肮脏方式,但另一个ZeroMQ警告对此是公平的,以便进一步谨慎朝这个方向发展(ØMQ不保证套接字将接受为许多ZMQ_SNDHWM消息,实际限制可能会 60-70%,具体取决于套接字上的消息流量).

忘记或无法做到这一点(参考: OP代码已经证明):

//$reply = $requester->recv();
Run Code Online (Sandbox Code Playgroud)

意味着你的REQ/REP正式沟通模式"钟摆"变得不可逆转地陷入僵局(永远).


A2:基本REQ/REP形式 - 通信 - 模式听起来很直,但有一些危险的特征,观察到的阻塞只是其中之一.一些额外的步骤可能采取代码的角度来看,部署XREQ/XREP,DEALER/ROUTER和其他工具,但设计不应该仅仅修订上爬起来,SLOCSLOC作为似乎有很多事情在设计代码之前实现.一个主要的错误是假设一个send()方法被订购后发送一条消息.在ZeroMQ中并非如此.

此外,代码设计应该假定消息的不确定性,并且正确处理丢失的消息问题和任何类型的分布式服务阻塞事件(死锁,活锁,缓冲区阈值溢出,旧/新API冲突,因为那里没有任何明确保证您的任何消息传递对等方(ZeroMQ中没有中央消息代理)在其localhost端实现了相同版本的ZeroMQ API /协议 - 因此确实有很多新的观点,在此期间代码设计)


这样做的最好方法

如果你可以相信并相信一段实践经验,那么你最好的下一步应该是下载和阅读神话般的Pieter HINTJENS的书"Code Connected,Volume 1",其中Pieter对分布式处理有很多见解,包括许多可靠模式的提示和方向,正如您想要实现的那样.

读这本书,它既值得你花时间,如果你留在分布式软件设计中,你可能会多次重新阅读这本书,所以不要犹豫,立即开始跳到硕士硕士这样一本400多页的食谱,Pieter HINTJENS出的问题是.

为什么?现实世界通常要复杂得多

只有一张图片,图60中的上述书籍忘记了个人原型的重复使用,并意识到需要一个适当的端到端分布式系统设计视角,包括阻止避免死锁解决策略:

运输机+ SIG飞机

只是为了有所了解,请从简单的分布式消息传递中查看以下代码示例,其中aMiniRESPONDER进程使用多个ZeroMQ通道.

在此输入图像描述


如何在相当大的Web PHP应用程序域中改进您的实现?

学习如何防止(设计方面)和处理(deux-ex-machina类型)其他碰撞.

PHP拥有适用于此类算法的所有合适的语法构造函数,但架构和设计始终掌握在您的手中.

只是要意识到,{ try:, except:, finally: }ZeroMQ信令基础设施设置/系统部分/ ZeroMQ优雅终止工作的冲突感知风格有多大,请检查[SoW]行数:

14544 - 14800 // a safe infrastructure setup on aMiniRESPONDER side   ~ 256 SLOCs
15294 - 15405 // a safe infrastructure graceful termination          ~ 110 SLOCs
Run Code Online (Sandbox Code Playgroud)

aMiniRESPONDER示例的事件处理部分的核心逻辑相比较

14802 - 15293 // aMiniRESPONDER logic, incl. EXC-HANDLERs             ~ 491 SLOCs
Run Code Online (Sandbox Code Playgroud)

关于基于ZeroMQ的分布式系统的最后一点

苛刻?是的,但非常强大,可扩展,快速,并且确实有益于正确使用.不要犹豫,投入时间和精力来获取和管理您在该领域的知识.您所有进一步的软件项目可能只会受益于此专业投资.


apo*_*fos 2

我只能部分回答这个问题。我不知道为什么该过程会在 2-3 天后挂起。

总体而言,PHP 脚本在脚本执行时加载一次,似乎没有任何方法可以解决此限制。但是,您当前的代码可以重写如下:

一些文件.php

$context = new ZMQContext(1);

//  Socket to talk to clients
$responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
$responder->bind("tcp://*:5555");

while (true) {
    $request = $responder->recv();
    printf ("Received request: [%s]\n", $request);

    $subscriptParams = [
        "Request" => $request //Add more parameters here as needed
    ];

    $result = shell_exec("php work.php ".base64_encode(serialize($subscriptParams)));       

    //  Send reply back to client
    $responder->send("Basic Reply");
}
Run Code Online (Sandbox Code Playgroud)

工作.php

if (!isset($argv[0])) {
   die("Need an argument");
}

$params = unserialize(base64_decode($argv[0]));
//Validate parameters
$request = $params["Request"];
//  Do some 'work'
//Process push notifications here!
sleep (1); 
Run Code Online (Sandbox Code Playgroud)

这是我的假设:

  1. 脚本的设置部分,例如设置$context$responder将永远保持不变(或者您可以忍受由于更改而重新启动脚本所需的停机时间)。

  2. 循环的开始和结束保持不变,我的想法是,shell_exec将返回一个响应,响应者可以将其用作实际响应。

更多澄清:

我用来传递需要使用的serialize参数数组。work.php我正在进行base64编码解码,因为我想确保整个参数都适合,$argv[0]并且不会在参数中找到的潜在空间上被分割。

的结果可以使用相同serialize -> base64_encode和组合。 base64_decode -> deserializework.php

请注意,我根本没有亲自尝试过这段代码,所以我不能保证它有效。我只是不明白为什么它不起作用(请确保php它在您的路径中,或者如果不在/usr/bin/php您的路径中,请调用您的路径)。shell_exec

值得注意的是,此解决方案比将所有代码放在一个文件中要慢得多,但这是每次迭代时刷新脚本的成本。