在一段时间不活动后,在PHP CLI脚本中运行功能

Xat*_*too 5 php rabbitmq symfony reactphp

我使用Symfony2和RabbitMqBundle来创建一个向ElasticSearch发送文档的worker.以一对一的速率索引文档比使用ElasticSearch批量API慢得多.因此,我创建了一个缓冲区,以1000个为一组将文档刷新到ES.代码看起来(稍微简化)如下:

class SearchIndexator
{
    protected $elasticaService;
    protected $buffer = [];
    protected $bufferSize = 0;

    // The maximum number of documents to keep in the buffer.
    // If the buffer reaches this amount of documents, then the buffers content
    // is send to elasticsearch for indexation.
    const MAX_BUFFER_SIZE = 1000;

    public function __construct(ElasticaService $elasticaService)
    {
        $this->elasticaService = $elasticaService;
    }

    /**
     * Destructor
     *
     * Flush any documents that remain in the buffer.
     */
    public function __destruct()
    {
        $this->flush();
    }

    /**
     * Add a document to the indexation buffer.
     */
    public function onMessage(array $document)
    {
        // Prepare the document for indexation.
        $this->doHeavyWeightStuff($document);

        // Create an Elastica document
        $document = new \Elastica\Document(
            $document['key'],
            $document
        );

        // Add the document to the buffer.
        $this->buffer[] = $document;

        // Flush the buffer when max buffersize has been reached.
        if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
            $this->flush();
        }
    }

    /**
     * Send the current buffer to ElasticSearch for indexation.
     */
    public function flush()
    {
        // Send documents to ElasticSearch for indexation.
        if (1 <= $this->bufferSize) {
            $this->elasticaService->addDocuments($this->buffer);
        }

        // Clear buffer
        $this->buffer = [];
        $this->bufferSize = 0;
    }
}
Run Code Online (Sandbox Code Playgroud)

这一切都很好,但有一个小问题.队列以不可预测的速率填充消息.有时在5分钟内100000,有时不是一个小时.例如,当有82671个文档排队时,最后的671个文档在收到另外329个文档之前不会被索引,这可能需要几个小时.我希望能够做到以下几点:

警告:科幻密码!这显然不起作用:

class SearchIndexator
{
    protected $elasticaService;
    protected $buffer = [];
    protected $bufferSize = 0;
    protected $flushTimer;

    // The maximum number of documents to keep in the buffer.
    // If the buffer reaches this amount of documents, then the buffers content
    // is send to elasticsearch for indexation.
    const MAX_BUFFER_SIZE = 1000;

    public function __construct(ElasticaService $elasticaService)
    {
        $this->elasticaService = $elasticaService;

        // Highly Sci-fi code
        $this->flushTimer = new Timer();
        // Flush buffer after 5 minutes of inactivity.
        $this->flushTimer->setTimeout(5 * 60);
        $this->flushTimer->setCallback([$this, 'flush']);
    }

    /**
     * Destructor
     *
     * Flush any documents that remain in the buffer.
     */
    public function __destruct()
    {
        $this->flush();
    }

    /**
     * Add a document to the indexation buffer.
     */
    public function onMessage(array $document)
    {
        // Prepare the document for indexation.
        $this->doHeavyWeightStuff($document);

        // Create an Elastica document
        $document = new \Elastica\Document(
            $document['key'],
            $document
        );

        // Add the document to the buffer.
        $this->buffer[] = $document;

        // Flush the buffer when max buffersize has been reached.
        if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
            $this->flush();
        } else {
            // Start a timer that will flush the buffer after a timeout.
            $this->initTimer();
        }
    }

    /**
     * Send the current buffer to ElasticSearch for indexation.
     */
    public function flush()
    {
        // Send documents to ElasticSearch for indexation.
        if (1 <= $this->bufferSize) {
            $this->elasticaService->addDocuments($this->buffer);
        }

        // Clear buffer
        $this->buffer = [];
        $this->bufferSize = 0;

        // There are no longer messages to be send, stop the timer.
        $this->flushTimer->stop();
    }

    protected function initTimer()
    {
        // Start or restart timer
        $this->flushTimer->isRunning()
          ? $this->flushTimer->reset()
          : $this->flushTimer->start();
    }
}
Run Code Online (Sandbox Code Playgroud)

现在,我知道PHP不受事件驱动的限制.但这是2015年,还有像ReactPHP这样的解决方案,所以这应该是可能的吗?对于ØMQ,有这个功能.什么是适用于RabbitMQ或独立于任何消息队列扩展的解决方案?

我对此持怀疑态度的解决方案:

  1. crysalead /代码.它使用模拟计时器declare(ticks = 1);.我不确定这是一种高效而坚实的方法.有任何想法吗?
  2. 我可以运行一个cronjob,每5分钟向同一队列发布一条"FLUSH"消息,然后在收到此消息时显式刷新缓冲区,但这样做会让人作弊.

Ken*_*ncé 0

正如我在评论中提到的,您可以使用这些信号。PHP 允许您将信号处理程序注册到脚本信号(即 SIGINT、SIGKILL 等)

对于您的用例,您可以使用 SIGALRM 信号。该信号将在一定时间(您可以设置)到期后向您的脚本发出警报。这些信号的积极一面是它们是非阻塞的。换句话说,你的脚本的正常运行不会受到干扰。

调整后的解决方案(自 PHP 5.3 起不推荐使用刻度):

function signal_handler($signal) {
    // You would flush here
    print "Caught SIGALRM\n";
    // Set the SIGALRM timer again or it won't trigger again
    pcntl_alarm(300);
}

// register your handler with the SIGALRM signal
pcntl_signal(SIGALRM, "signal_handler", true);
// set the timeout for the SIGALRM signal to 300 seconds
pcntl_alarm(300);

// start loop and check for pending signals
while(pcntl_signal_dispatch() && your_loop_condition) {
    //Execute your code here
}
Run Code Online (Sandbox Code Playgroud)

注意:您只能在脚本中使用 1 个 SIGALRM 信号,如果您使用计时器设置信号时间,pcntl_alarm则警报将重置(不触发信号)为其新设置的值。