我是RabbitMQ等消息代理的新手,我们可以使用它来为Celery等调度系统创建任务/消息队列.
现在,问题是:
我可以在PostgreSQL中创建一个表,它可以附加新任务并由Celery等消费者程序使用.
为什么我想为RabbitMQ设置一个全新的技术?
现在,我认为扩展不能成为答案,因为像PostgreSQL这样的数据库可以在分布式环境中工作.
我搜索了数据库为特定问题提出的问题,我发现:
现在,RabbitMQ或任何其他类似的消息代理如何解决这些问题?
此外,我发现AMQP协议是它所遵循的.那有什么好处的?
可以Redis的也可以用作消息代理?我发现它更类似于memcache然后是RabbitMQ.
请注意这个!
在postgres中,如果我运行以下语句
update table set col = 1 where col = 2
Run Code Online (Sandbox Code Playgroud)
在默认READ COMMITTED隔离级别,来自多个并发会话,我保证:
我正在使用 Python 和 psycopg2 运行一堆查询。我创建了一个包含大约 200 万行的大型临时表,然后通过使用cur.fetchmany(1000)和运行涉及这些行的更广泛的查询,一次从中获取 1000行。但是,广泛的查询是自给自足的 - 一旦完成,当我继续下一个 1000 时,我不再需要它们的结果。
但是,在大约 1000000 行中,我从 psycopg2 中得到了一个例外:
psycopg2.OperationalError: out of shared memory
HINT: You might need to increase max_locks_per_transaction.
Run Code Online (Sandbox Code Playgroud)
有趣的是,这发生在我执行查询以删除更广泛的查询创建的一些临时表时。
为什么会发生这种情况?有什么办法可以避免吗?这很烦人,这发生在中途,这意味着我必须再次运行它。可能max_locks_per_transaction有什么关系?
注意:我没有做任何.commit()s,但我正在删除我创建的所有临时表,而且我只为每个“广泛”事务处理相同的 5 个表,所以我不知道如何用完表锁可能是问题...
我有点想知道我正在做一张大桌子的更新,我是否需要担心锁。
我有一张桌子,看起来像这样:
CREATE TABLE "ItemsToProcess"(
"id" text,
"WorkerInstanceId" text,
"ProcessingStartTime" timestamp with time zone,
"UpdatedTime" timestamp with time zone,
CONSTRAINT "ITP_PK" PRIMARY KEY ("id")
)WITH (
OIDS=FALSE
);
Run Code Online (Sandbox Code Playgroud)
最初,此表中有〜200万行,并且默认情况下以及运行开始时,仅填充的ID,WorkerInstanceId和两个时间戳为null。
发生的情况是,某些工作人员应用程序(至少两个,但在生产中大约为10-13)会从该表中标记一批ID-s(我计划将batchSize设置为200)进行处理。处理过程中发生的事情现在并不重要。批处理的标记如下所示:
UPDATE "ItemsToProcess"
SET "WorkerInstanceId" = ?, "ProcessingStartTime" = current_timestamp()
WHERE "WorkerInstanceId" is NULL
LIMIT 200;
Run Code Online (Sandbox Code Playgroud)
我的问题是,在进行更新之前,我是否需要担心锁定要更新的行?
Postgres文档说:
排他性
与SHARE,SHARE ROW EXCLUSIVE,EXCLUSIVE和ACCESS EXCLUSIVE锁定模式冲突。
命令UPDATE,DELETE和INSERT在目标表上获得此锁定模式(除了对任何其他引用表的ACCESS SHARE锁定之外)。通常,任何修改表中数据的命令都将获取此锁定模式。
因此,我认为每当一个工作人员进行此更新时,整个表将被锁定,将更新200行,最后释放该锁。在锁到位之前,其他工人正在等待锁释放。我是对的还是我想念什么
谢谢您的帮助!
postgresql ×4
celery ×1
database ×1
locking ×1
psycopg2 ×1
python ×1
rabbitmq ×1
redis ×1
transactions ×1