Gru*_*ruz 6 postgresql queue worker supervisord laravel
我需要监听通过诸如 DBeaver 之类的数据库管理器完成的 Postgres 表更改(CRUD),并将更新的行 id 传递给 laravel 驱动的 API enpoint。
在 Postgres 中,我创建了一个表、表的触发器以及在 postgres 端处理事件的函数
CREATE TABLE PUBLIC.TBLEXAMPLE
(
KEY1 CHARACTER VARYING(10) NOT NULL,
KEY2 CHARACTER VARYING(14) NOT NULL,
VALUE1 CHARACTER VARYING(20),
VALUE2 CHARACTER VARYING(20) NOT NULL,
CONSTRAINT TBLEXAMPLE_PKEY PRIMARY KEY (KEY1, KEY2)
);
CREATE OR REPLACE FUNCTION PUBLIC.NOTIFY() RETURNS trigger AS
$BODY$
BEGIN
PERFORM pg_notify('myevent', row_to_json(NEW)::text);
RETURN new;
END;
$BODY$
LANGUAGE 'plpgsql' VOLATILE COST 100;
CREATE TRIGGER TBLEXAMPLE_AFTER
AFTER insert or update or delete
ON PUBLIC.TBLEXAMPLE
FOR EACH ROW
EXECUTE PROCEDURE PUBLIC.NOTIFY();
Run Code Online (Sandbox Code Playgroud)
我有一个基本的 PHP 脚本,旨在从 CLI 运行。当我运行它时,我会收到有关 PG 表中更新的通知
<?php
$db = new PDO(
"pgsql:dbname=database host=localhost port=5432", 'postgres', 'password', [
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
]
);
$db->exec('LISTEN myevent');
echo 'Starting';
while(true) {
while ($result = $db->pgsqlGetNotify(PDO::FETCH_ASSOC, 30000)) {
echo print_r($result, true) . PHP_EOL;
}
}
Run Code Online (Sandbox Code Playgroud)
这是它的样子
将上面的 PHP 脚本作为 Laravel 部分运行的正确方法是什么?
请指出我要读什么,也许是类似的解决方案。
我知道像“worker”、“queue”这样聪明的词,我
php artisan queue:work在 API 中使用(用户请求将作业添加到队列的端点)。但在这种情况下,用户的角色应该由上面的 php 脚本逻辑来执行。我的建议。我可能必须使用
php artisan listen2posrgres上面的逻辑开发类似的东西,并以类似于php artisan queue:work整个主管的方式运行它。这可以吗?
正如一些评论所指出的,您可以为此编写一个自定义 Artisan 命令。运行该命令开始“监听”触发事件。
在 PostgreSQL 中,您可以创建一个触发器,其作用类似于 Laravel 中的订阅者 - 观察表的更改,它收集数据(例如表名称和操作),pg_notify如果 Laravel 定义了与此数据库的连接,则将其传递给 Laravel。您可以使用 JSON 处理对数据进行编码以传递给 Laravel,然后将其解析为 JSON 编码的字符串。
下面是一个监视 Orders 或 Users 表的 UPDATE、INSERT 或 DELETE 的简单示例。
扳机
create function notify_event() returns trigger
language plpgsql
as
$$
DECLARE
notification json;
BEGIN
-- PostgreSQL auto-defined variables:
-- TG_OP ~ action such as INSERT, DELETE, UPDATE
-- TG_TABLE_NAME
-- Contruct the notification as a JSON string.
notification = json_build_object(
'table',TG_TABLE_NAME,
'action', TG_OP);
-- Execute pg_notify(channel, notification)
PERFORM pg_notify('events', notification::text);
-- Result is ignored since this is an AFTER trigger
RETURN NULL;
END;
$$;
alter function notify_event() owner to YOUR_DATABASE_NAME_HERE;
Run Code Online (Sandbox Code Playgroud)
工匠指挥部
<?php
namespace App\Console\Commands;
use App\Events\OrderCreated;
use App\Events\OrderDeleted;
use App\Events\OrderUpdated;
use App\Events\UserCreated;
use App\Events\UserDeleted;
use App\Events\UserUpdated;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
/**
* Class SubscribeToTriggers
*
* @package App\Console\Commands
*/
class SubscribeToTriggers extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'psql:subscribe-to-triggers {--t|table=* : Tables to synchronize.}';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Listen for changes on database and update the platform accordingly';
/**
* Tables to synchronize.
*
* @var array
*/
protected $tables;
/**
* @var
*/
private $subscribers;
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
$this->tables = [];
$this->subscribe();
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$timeout = (int) $this->option('timeout');
if ($table = $this->option('table')) {
if (is_array($table)) {
$this->tables = $table;
} else {
$this->tables[] = $table;
}
}
try {
$dbo = DB::connection('DATABASE_NAME_HERE')->getPdo();
$dbo->exec('LISTEN "events"');
while (true) {
$event = $dbo->pgsqlGetNotify(\PDO::FETCH_ASSOC, $timeout * 1000);
if ($this->output->isDebug()) {
$this->getOutput()->write($event);
$this->getOutput()->write(PHP_EOL);
}
$payload = json_decode($event['payload']);
$table = $payload->table;
$action = $payload->action;
$original = $payload->original;
$data = $payload->data;
$observer = null;
$subject = implode('@', [$table, strtolower($action)]);
if (array_key_exists($subject, $this->subscribers)) {
$observer = $this->subscribers[$subject];
} else if (array_key_exists($table, $this->subscribers)) {
$observer = $this->subscribers[$table];
}
if (isset($observer) && method_exists($this, $observer->handler)) {
$handler = $observer->handler;
$this->$handler($data, $action, $original);
}
}
} catch (Exception $e) {
logger($e->getMessage());
}
}
/**
* Set up observers to handle events on a table.
*
* @param $entity
* @param $handler
*/
private function listen($entity, $handler)
{
if (!isset($this->subscribers)) {
$this->subscribers = [];
}
$info = explode('@', $entity);
$table = $info[0];
$action = count($info) > 1 ? $info[1] : null;
$observer = new \stdClass();
$observer->table = $table;
$observer->action = $action;
$observer->handler = $handler;
$subject = !empty($action) ? implode('@', [$table, strtolower($action)]) : $table;
$this->subscribers[$subject] = $observer;
}
/**
* Subscribe to modification events on these tables.
*/
private function subscribe()
{
$this->listen('orders_table', 'onOrder');
$this->listen('users_table', 'onUser');
}
/**
* @param $order
* @param null $action
* @param null $original
*/
protected function onOrder($order, $action = null, $original = null)
{
$event = null;
if ($action == 'INSERT') {
$event = new OrderCreated();
} else if ($action === 'UPDATE') {
$event = new OrderUpdated();
} else if ($action == 'DELETE') {
$event = new OrderDeleted();
}
if (!is_null($event)) {
event($event);
}
}
/**
* @param $user
* @param null $action
* @param null $original
*/
protected function onUser($user, $action = null, $original = null)
{
$event = null;
if ($action == 'INSERT') {
$event = new UserCreated();
} else if ($action === 'UPDATE') {
$event = new UserUpdated();
} else if ($action == 'DELETE') {
$event = new UserDeleted();
}
if (!is_null($event)) {
event($event);
}
}
}
Run Code Online (Sandbox Code Playgroud)
然后,您将编写一个 Laravel Subscriber 来为命令中定义的每个事件定义 EventListener:
并确保在 EventServiceProvider 的 $subscribers 块中注册此订阅者。
参考
PostgreSQL 触发 PostgreSQL JSON 和函数 Laravel 事件:订阅者
| 归档时间: |
|
| 查看次数: |
2412 次 |
| 最近记录: |