25 sql postgresql concurrency upsert plpgsql
我写了一个函数为一个简单的博客引擎创建帖子:
CREATE FUNCTION CreatePost(VARCHAR, TEXT, VARCHAR[])
RETURNS INTEGER AS $$
DECLARE
InsertedPostId INTEGER;
TagName VARCHAR;
BEGIN
INSERT INTO Posts (Title, Body)
VALUES ($1, $2)
RETURNING Id INTO InsertedPostId;
FOREACH TagName IN ARRAY $3 LOOP
DECLARE
InsertedTagId INTEGER;
BEGIN
-- I am concerned about this part.
BEGIN
INSERT INTO Tags (Name)
VALUES (TagName)
RETURNING Id INTO InsertedTagId;
EXCEPTION WHEN UNIQUE_VIOLATION THEN
SELECT INTO InsertedTagId Id
FROM Tags
WHERE Name = TagName
FETCH FIRST ROW ONLY;
END;
INSERT INTO Taggings (PostId, TagId)
VALUES (InsertedPostId, InsertedTagId);
END;
END LOOP;
RETURN InsertedPostId;
END;
$$ LANGUAGE 'plpgsql';
Run Code Online (Sandbox Code Playgroud)
当多个用户同时删除标签并创建帖子时,这是否容易出现竞争条件?
具体来说,交易(以及功能)是否会阻止这种竞争条件的发生?
我正在使用PostgreSQL 9.2.3.
Erw*_*ter 41
它的反复出现的问题SELECT
或INSERT
下可能并发写入负载,涉及(但不同于)UPSERT
(这是INSERT
或UPDATE
).
使用新的UPSERT实现INSERT ... ON CONFLICT .. DO UPDATE
,我们可以大大简化.PL/pgSQL函数到INSERT
或SELECT
一个单个行(标记):
CREATE OR REPLACE FUNCTION f_tag_id(_tag text, OUT _tag_id int) AS
$func$
BEGIN
SELECT tag_id -- only if row existed before
FROM tag
WHERE tag = _tag
INTO _tag_id;
IF NOT FOUND THEN
INSERT INTO tag AS t (tag)
VALUES (_tag)
ON CONFLICT (tag) DO NOTHING
RETURNING t.tag_id
INTO _tag_id;
END IF;
END
$func$ LANGUAGE plpgsql;
Run Code Online (Sandbox Code Playgroud)
竞争条件仍有一个小窗口.要确保您获得一个ID:
CREATE OR REPLACE FUNCTION f_tag_id(_tag text, OUT _tag_id int) AS
$func$
BEGIN
LOOP
SELECT tag_id
FROM tag
WHERE tag = _tag
INTO _tag_id;
EXIT WHEN FOUND;
INSERT INTO tag AS t (tag)
VALUES (_tag)
ON CONFLICT (tag) DO NOTHING
RETURNING t.tag_id
INTO _tag_id;
EXIT WHEN FOUND;
END LOOP;
END
$func$ LANGUAGE plpgsql;
Run Code Online (Sandbox Code Playgroud)
这将一直循环直到成功INSERT
或SELECT
成功.呼叫:
SELECT f_tag_id('possibly_new_tag');
Run Code Online (Sandbox Code Playgroud)
如果同一事务中的后续命令依赖于行的存在,并且实际上其他事务可能同时更新或删除它,则可以使用锁定SELECT
语句中的现有行FOR SHARE
.
如果该行被插入,则它将被锁定,直到事务结束为止.
如果在大多数时间插入新行,请从INSERT
更快的位置开始.
有关:
相关(纯SQL)溶液到INSERT
或SELECT
多个行一次(一组):
我之前也曾建议过这个SQL函数:
CREATE OR REPLACE FUNCTION f_tag_id(_tag text, OUT _tag_id int) AS
$func$
WITH ins AS (
INSERT INTO tag AS t (tag)
VALUES (_tag)
ON CONFLICT (tag) DO NOTHING
RETURNING t.tag_id
)
SELECT tag_id FROM ins
UNION ALL
SELECT tag_id FROM tag WHERE tag = _tag
LIMIT 1
$func$ LANGUAGE sql;
Run Code Online (Sandbox Code Playgroud)
这并不是完全错误的,但它没有堵住漏洞,就像@FunctorSalad在他补充的答案中解决的那样.如果并发事务尝试同时执行相同操作,则该函数可以得到空结果.具有CTE的查询中的所有语句实际上是同时执行的.手册:
所有语句都使用相同的快照执行
如果并发事务稍早插入相同的新标记,但尚未提交,则:
等待并发事务完成后,UPSERT部分出现空白.(如果并发事务应该回滚,它仍然会插入新标记并返回一个新ID.)
SELECT部分也是空的,因为它基于相同的快照,其中来自(但未提交的)并发事务的新标记不可见.
我们一无所获.不是预期的.这对于天真的逻辑来说是违反直觉的(我被抓到了),但这就是Postgres的MVCC模型的工作方式 - 必须有效.
因此,如果多个事务可以尝试同时插入相同的标记,请不要使用此方法.或者循环,直到你真正得到一排.在常见的工作负载中几乎不会触发循环.
鉴于此(略微简化)表:
CREATE table tag (
tag_id serial PRIMARY KEY
, tag text UNIQUE
);
Run Code Online (Sandbox Code Playgroud)
... 插入新标签/选择现有标签的几乎100%安全功能,可能如下所示.
为什么不100%?请考虑相关UPSERT
示例手册中的注释:
CREATE OR REPLACE FUNCTION f_tag_id(_tag text, OUT tag_id int) AS
$func$
BEGIN
LOOP
BEGIN
WITH sel AS (SELECT t.tag_id FROM tag t WHERE t.tag = _tag FOR SHARE)
, ins AS (INSERT INTO tag(tag)
SELECT _tag
WHERE NOT EXISTS (SELECT 1 FROM sel) -- only if not found
RETURNING tag.tag_id) -- qualified so no conflict with param
SELECT sel.tag_id FROM sel
UNION ALL
SELECT ins.tag_id FROM ins
INTO tag_id;
EXCEPTION WHEN UNIQUE_VIOLATION THEN -- insert in concurrent session?
RAISE NOTICE 'It actually happened!'; -- hardly ever happens
END;
EXIT WHEN tag_id IS NOT NULL; -- else keep looping
END LOOP;
END
$func$ LANGUAGE plpgsql;
Run Code Online (Sandbox Code Playgroud)
试试第SELECT
一个.这样,您可以避免99.99%的时间内相当昂贵的异常处理.
使用CTE最小化竞争条件的(已经很小的)时隙.
一个查询SELECT
和INSERT
一个查询之间的时间窗口非常小.如果您没有繁重的并发负载,或者您可以每年使用一次异常,则可以忽略该情况并使用更快的SQL语句.
不需要FETCH FIRST ROW ONLY
(= LIMIT 1
).标签名称显然是UNIQUE
.
FOR SHARE
如果您通常没有并发DELETE
或UPDATE
在桌面上,请在我的示例中删除tag
.耗费一点点性能.
永远不要引用语言名称:'plpgsql'.plpgsql
是一个标识符.引用可能会导致问题,并且只能容忍向后兼容性.
不要使用像id
或的非描述性列名name
.当连接几个表(这是您在关系数据库中执行的操作)时,您最终会得到多个相同的名称,并且必须使用别名.
使用此功能可以大大简化您FOREACH LOOP
的:
...
FOREACH TagName IN ARRAY $3
LOOP
INSERT INTO taggings (PostId, TagId)
VALUES (InsertedPostId, f_tag_id(TagName));
END LOOP;
...
Run Code Online (Sandbox Code Playgroud)
但是,更快,作为单个SQL语句unnest()
:
INSERT INTO taggings (PostId, TagId)
SELECT InsertedPostId, f_tag_id(tag)
FROM unnest($3) tag;
Run Code Online (Sandbox Code Playgroud)
替换整个循环.
此变体建立在UNION ALL
带LIMIT
子句的行为的基础上:只要找到足够的行,其余的行就永远不会执行:
在此基础上,我们可以将其外包给INSERT
一个单独的功能.只有那里我们需要异常处理.和第一个解决方案一样安全.
CREATE OR REPLACE FUNCTION f_insert_tag(_tag text, OUT tag_id int)
RETURNS int AS
$func$
BEGIN
INSERT INTO tag(tag) VALUES (_tag) RETURNING tag.tag_id INTO tag_id;
EXCEPTION WHEN UNIQUE_VIOLATION THEN -- catch exception, NULL is returned
END
$func$ LANGUAGE plpgsql;
Run Code Online (Sandbox Code Playgroud)
在主要功能中使用:
CREATE OR REPLACE FUNCTION f_tag_id(_tag text, OUT _tag_id int) AS
$func$
BEGIN
LOOP
SELECT tag_id FROM tag WHERE tag = _tag
UNION ALL
SELECT f_insert_tag(_tag) -- only executed if tag not found
LIMIT 1 -- not strictly necessary, just to be clear
INTO _tag_id;
EXIT WHEN _tag_id IS NOT NULL; -- else keep looping
END LOOP;
END
$func$ LANGUAGE plpgsql;
Run Code Online (Sandbox Code Playgroud)
如果大多数调用只需要SELECT
,这会便宜一些,因为很少输入INSERT
包含该EXCEPTION
子句的更昂贵的块.查询也更简单.
FOR SHARE
这里不可能(UNION
查询中不允许).
LIMIT 1
没有必要(在第9.4页测试).Postgres派生LIMIT 1
自INTO _tag_id
并且仅在找到第一行之前执行.
ON CONFLICT
即使使用Postgres 9.5 中引入的子句,仍然有一些事情需要注意。使用与 @Erwin Brandstetter 的答案相同的函数和示例表,如果我们这样做:
Session 1: begin;
Session 2: begin;
Session 1: select f_tag_id('a');
f_tag_id
----------
11
(1 row)
Session 2: select f_tag_id('a');
[Session 2 blocks]
Session 1: commit;
[Session 2 returns:]
f_tag_id
----------
NULL
(1 row)
Run Code Online (Sandbox Code Playgroud)
因此在会话 2 中f_tag_id
返回NULL
,这在单线程世界中是不可能的!
如果我们将事务隔离级别提高到repeatable read
(或更强的serializable
),会话 2 就会抛出ERROR: could not serialize access due to concurrent update
异常。所以至少没有“不可能”的结果,但不幸的是我们现在需要准备重试交易。
编辑:使用repeatable read
or serializable
,如果会话 1 插入 tag a
,然后会话 2 插入b
,然后会话 1 尝试插入b
,会话 2 尝试插入a
,一个会话检测到死锁:
ERROR: deadlock detected
DETAIL: Process 14377 waits for ShareLock on transaction 1795501; blocked by process 14363.
Process 14363 waits for ShareLock on transaction 1795503; blocked by process 14377.
HINT: See server log for query details.
CONTEXT: while inserting index tuple (0,3) in relation "tag"
SQL function "f_tag_id" statement 1
Run Code Online (Sandbox Code Playgroud)
收到死锁错误的会话回滚后,另一个会话继续。serialization_failure
所以我想在这种情况下我们应该像对待死锁一样重试?
或者,以一致的顺序插入标签,但如果它们没有全部添加到一个位置,这并不容易。
归档时间: |
|
查看次数: |
8449 次 |
最近记录: |