如何在 Laravel 中监听 Postgres 监听/通知?

Gru*_*ruz 6 postgresql queue worker supervisord laravel

我的总体任务

我需要监听通过诸如 DBeaver 之类的数据库管理器完成的 Postgres 表更改(CRUD),并将更新的行 id 传递给 laravel 驱动的 API enpoint。

我拥有的

Postgres部分

在 Postgres 中,我创建了一个表、表的触发器以及在 postgres 端处理事件的函数

CREATE TABLE PUBLIC.TBLEXAMPLE
(
  KEY1 CHARACTER VARYING(10) NOT NULL,
  KEY2 CHARACTER VARYING(14) NOT NULL,
   VALUE1 CHARACTER VARYING(20),
  VALUE2 CHARACTER VARYING(20) NOT NULL,
   CONSTRAINT TBLEXAMPLE_PKEY PRIMARY KEY (KEY1, KEY2)
);

CREATE OR REPLACE FUNCTION PUBLIC.NOTIFY() RETURNS trigger AS
$BODY$
BEGIN
  PERFORM pg_notify('myevent', row_to_json(NEW)::text);
  RETURN new;
END;
$BODY$
LANGUAGE 'plpgsql' VOLATILE COST 100;


CREATE TRIGGER TBLEXAMPLE_AFTER
AFTER insert or update or delete 
ON PUBLIC.TBLEXAMPLE
FOR EACH ROW
EXECUTE PROCEDURE PUBLIC.NOTIFY();
Run Code Online (Sandbox Code Playgroud)

PHP部分

我有一个基本的 PHP 脚本,旨在从 CLI 运行。当我运行它时,我会收到有关 PG 表中更新的通知

<?php
$db = new PDO(
    "pgsql:dbname=database host=localhost port=5432", 'postgres', 'password', [
        PDO::ATTR_ERRMODE            => PDO::ERRMODE_EXCEPTION,
        PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
    ]
);

$db->exec('LISTEN myevent');
echo 'Starting';

while(true) {
    while ($result = $db->pgsqlGetNotify(PDO::FETCH_ASSOC, 30000)) {
        echo print_r($result, true) . PHP_EOL;
    }
}

Run Code Online (Sandbox Code Playgroud)

这是它的样子

在此输入图像描述

问题

将上面的 PHP 脚本作为 Laravel 部分运行的正确方法是什么?

请指出我要读什么,也许是类似的解决方案。

我知道像“worker”、“queue”这样聪明的词,我php artisan queue:work在 API 中使用(用户请求将作业添加到队列的端点)。但在这种情况下,用户的角色应该由上面的 php 脚本逻辑来执行。

我的建议。我可能必须使用php artisan listen2posrgres上面的逻辑开发类似的东西,并以类似于php artisan queue:work整个主管的方式运行它。这可以吗?

Rob*_*ear 2

正如一些评论所指出的,您可以为此编写一个自定义 Artisan 命令。运行该命令开始“监听”触发事件。

在 PostgreSQL 中,您可以创建一个触发器,其作用类似于 Laravel 中的订阅者 - 观察表的更改,它收集数据(例如表名称和操作),pg_notify如果 Laravel 定义了与此数据库的连接,则将其传递给 Laravel。您可以使用 JSON 处理对数据进行编码以传递给 Laravel,然后将其解析为 JSON 编码的字符串。

下面是一个监视 Orders 或 Users 表的 UPDATE、INSERT 或 DELETE 的简单示例。

扳机

create function notify_event() returns trigger
    language plpgsql
as
$$
DECLARE
        notification json;
    BEGIN

        -- PostgreSQL auto-defined variables:
        -- TG_OP    ~   action such as INSERT, DELETE, UPDATE
        -- TG_TABLE_NAME

        -- Contruct the notification as a JSON string.
        notification = json_build_object(
                          'table',TG_TABLE_NAME,
                          'action', TG_OP);


        -- Execute pg_notify(channel, notification)
        PERFORM pg_notify('events', notification::text);

        -- Result is ignored since this is an AFTER trigger
        RETURN NULL;
    END;

$$;

alter function notify_event() owner to YOUR_DATABASE_NAME_HERE;
Run Code Online (Sandbox Code Playgroud)

工匠指挥部

<?php

namespace App\Console\Commands;

use App\Events\OrderCreated;
use App\Events\OrderDeleted;
use App\Events\OrderUpdated;
use App\Events\UserCreated;
use App\Events\UserDeleted;
use App\Events\UserUpdated;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;

/**
 * Class SubscribeToTriggers
 *
 * @package App\Console\Commands
 */
class SubscribeToTriggers extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'psql:subscribe-to-triggers {--t|table=* : Tables to synchronize.}';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Listen for changes on database and update the platform accordingly';

    /**
     *  Tables to synchronize.
     *
     * @var array
     */
    protected $tables;

    /**
     * @var
     */
    private $subscribers;

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();

        $this->tables = [];
        $this->subscribe();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        $timeout = (int) $this->option('timeout');

        if ($table = $this->option('table')) {
            if (is_array($table)) {
                $this->tables = $table;
            } else {
                $this->tables[] = $table;
            }
        }

        try {
            $dbo = DB::connection('DATABASE_NAME_HERE')->getPdo();
            $dbo->exec('LISTEN "events"');
            while (true) {
                $event = $dbo->pgsqlGetNotify(\PDO::FETCH_ASSOC, $timeout * 1000);

                if ($this->output->isDebug()) {
                    $this->getOutput()->write($event);
                    $this->getOutput()->write(PHP_EOL);
                }

                $payload = json_decode($event['payload']);
                $table = $payload->table;
                $action = $payload->action;
                $original = $payload->original;
                $data = $payload->data;

                $observer = null;
                $subject = implode('@', [$table, strtolower($action)]);
                if (array_key_exists($subject, $this->subscribers)) {
                    $observer = $this->subscribers[$subject];
                } else if (array_key_exists($table, $this->subscribers)) {
                    $observer = $this->subscribers[$table];
                }
                if (isset($observer) && method_exists($this, $observer->handler)) {
                    $handler = $observer->handler;
                    $this->$handler($data, $action, $original);
                }
     
            }
        } catch (Exception $e) {
            logger($e->getMessage());
        }
    }

    /**
     * Set up observers to handle events on a table.
     *
     * @param $entity
     * @param $handler
     */
    private function listen($entity, $handler)
    {
        if (!isset($this->subscribers)) {
            $this->subscribers = [];
        }

        $info = explode('@', $entity);
        $table = $info[0];
        $action = count($info) > 1 ? $info[1] : null;

        $observer = new \stdClass();
        $observer->table = $table;
        $observer->action = $action;
        $observer->handler = $handler;
        $subject = !empty($action) ? implode('@', [$table, strtolower($action)]) : $table;
        $this->subscribers[$subject] = $observer;
    }

    /**
     * Subscribe to modification events on these tables.
     */
    private function subscribe()
    {
        $this->listen('orders_table', 'onOrder');
        $this->listen('users_table', 'onUser');
    }

    /**
     * @param $order
     * @param null $action
     * @param null $original
     */
    protected function onOrder($order, $action = null, $original = null)
    {
        $event = null;

        if ($action == 'INSERT') {
            $event = new OrderCreated();
        } else if ($action === 'UPDATE') {
            $event = new OrderUpdated();
        } else if ($action == 'DELETE') {
            $event = new OrderDeleted();
        }

        if (!is_null($event)) {
            event($event);
        }
    }

    /**
     * @param $user
     * @param null $action
     * @param null $original
     */
    protected function onUser($user, $action = null, $original = null)
    {
        $event = null;

        if ($action == 'INSERT') {
            $event = new UserCreated();
        } else if ($action === 'UPDATE') {
            $event = new UserUpdated();
        } else if ($action == 'DELETE') {
            $event = new UserDeleted();
        }

        if (!is_null($event)) {
            event($event);
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

然后,您将编写一个 Laravel Subscriber 来为命令中定义的每个事件定义 EventListener:

  • 订单创建/订单更新/订单删除
  • 用户创建/用户更新/用户删除

并确保在 EventServiceProvider 的 $subscribers 块中注册此订阅者。

参考

PostgreSQL 触发 PostgreSQL JSON 和函数 Laravel 事件:订阅者