作业队列为具有多个使用者的SQL表(PostgreSQL)

cod*_*ker 35 sql postgresql queue producer-consumer

我有一个典型的生产者 - 消费者问题:

多个生产者应用程序将作业请求写入PostgreSQL数据库上的作业表.

作业请求的状态字段在创建时包含QUEUED.

多个由当生产者插入一条新记录的规则通知的消费应用:

CREATE OR REPLACE RULE "jobrecord.added" AS
  ON INSERT TO jobrecord DO 
  NOTIFY "jobrecordAdded";
Run Code Online (Sandbox Code Playgroud)

他们将尝试通过将其状态设置为RESERVED来保留新记录.当然,只有消费者才能成功.所有其他消费者不应该保留相同的记录.他们应该保留state = QUEUED的其他记录.

示例:某个生产者将以下记录添加到表jobrecord:

id state  owner  payload
------------------------
1 QUEUED null   <data>
2 QUEUED null   <data>
3 QUEUED null   <data>
4 QUEUED null   <data>
Run Code Online (Sandbox Code Playgroud)

现在,两个消费者A,B想要处理它们.他们同时开始跑步.一个应该保留id 1,另一个应该保留id 2,然后完成的第一个应该保留id 3等等.

在纯多线程世界中,我会使用互斥锁来控制对作业队列的访问,但消费者是可以在不同机器上运行的不同进程.它们只访问同一个数据库,因此所有同步都必须通过数据库进行.

我在PostgreSQL中阅读了很多关于并发访问和锁定的文档,例如http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html 选择Postgresql PostgreSQL中的解锁行并锁定

从这些主题中我了解到,以下SQL语句应该能够满足我的需求:

UPDATE jobrecord
  SET owner= :owner, state = :reserved 
  WHERE id = ( 
     SELECT id from jobrecord WHERE state = :queued 
        ORDER BY id  LIMIT 1 
     ) 
  RETURNING id;  // will only return an id when they reserved it successfully
Run Code Online (Sandbox Code Playgroud)

不幸的是,当我在多个消费者流程中运行它时,在大约50%的时间内,它们仍然保留相同的记录,处理它和一个覆盖另一个的更改.

我错过了什么?如何编写SQL语句以便多个使用者不会保留相同的记录?

api*_*ein 33

我也使用postgres作为FIFO队列.我最初使用的是ACCESS EXCLUSIVE,它在高并发性方面产生了正确的结果,但是与pg_dump互斥是一个不幸的结果,pg_dump在执行期间获取了一个ACCESS SHARE锁.这会导致我的next()函数锁定很长时间(pg_dump的持续时间).这是不可接受的,因为我们是一个24x7的商店,客户不喜欢在半夜排队的死时间.

我认为必须有一个限制较少的锁,它仍然是并发安全的,而不是在pg_dump运行时锁定.我的搜索引导我到这个SO帖子.

然后我做了一些研究.

以下模式足以用于FIFO队列NEXT()函数,该函数将更新作业状态从排队运行而不会出现任何并发失败,也不会阻止pg_dump:

SHARE UPDATE EXCLUSIVE
SHARE ROW EXCLUSIVE
EXCLUSIVE
Run Code Online (Sandbox Code Playgroud)

查询:

begin;
lock table tx_test_queue in exclusive mode;
update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1
    )
returning job_id;
commit;
Run Code Online (Sandbox Code Playgroud)

结果如下:

UPDATE 1
 job_id
--------
     98
(1 row)
Run Code Online (Sandbox Code Playgroud)

这是一个shell脚本,它以高并发性(30)测试所有不同的锁模式.

#!/bin/bash
# RESULTS, feel free to repro yourself
#
# noLock                    FAIL
# accessShare               FAIL
# rowShare                  FAIL
# rowExclusive              FAIL
# shareUpdateExclusive      SUCCESS
# share                     FAIL+DEADLOCKS
# shareRowExclusive         SUCCESS
# exclusive                 SUCCESS
# accessExclusive           SUCCESS, but LOCKS against pg_dump

#config
strategy="exclusive"

db=postgres
dbuser=postgres
queuecount=100
concurrency=30

# code
psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
# empty queue
psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
echo "Simulating 10 second pg_dump with ACCESS SHARE"
psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &

echo "Starting workers..."
# queue $queuecount items
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
# process $queuecount w/concurrency of $concurrency
case $strategy in
    "noLock")               strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessShare")          strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowShare")             strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowExclusive")         strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "share")                strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareRowExclusive")    strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "exclusive")            strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessExclusive")      strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    *) echo "Unknown strategy $strategy";;
esac
echo $strategySql
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";
Run Code Online (Sandbox Code Playgroud)

如果您想编辑,代码也在这里:https://gist.github.com/1083936

我正在更新我的应用程序以使用EXCLUSIVE模式,因为它是最严格的模式,a)是正确的,b)与pg_dump不冲突.我选择了最严格的限制因为它似乎是最不具风险的,因为在没有成为postgres锁定的超级专家的情况下,从ACCESS EXCLUSIVE更改应用程序.

我对我的试验台和答案背后的一般想法感到非常满意.我希望分享这有助于解决其他问题.

  • 不应该需要表锁,只需要行锁:http://stackoverflow.com/a/30315387/492548 (2认同)

mac*_*oss 16

无需为此执行整个表锁:\.

创建的行锁for update很好用.

请参阅https://gist.github.com/mackross/a49b72ad8d24f7cefc32,了解我对apinstein的回答所做的更改,并确认它仍然有效.

最终的代码是

update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1 for update
    )
returning job_id;
Run Code Online (Sandbox Code Playgroud)

  • @mackross最好使用“for update跳过锁定”而不仅仅是“for update”,因为它不会阻止其他并发读取器。也许您没有使用它的原因是 2015 年还没有“skip lock”语句。 (3认同)

小智 5

在这里阅读我的文章:

带有锁定的Postgresql的一致性,并选择进行更新

如果使用事务和LOCK TABLE,则不会有任何问题。


Vla*_*nko 5

只是选择呢?

SELECT * FROM table WHERE status = 'QUEUED' LIMIT 10 FOR UPDATE SKIP LOCKED;
Run Code Online (Sandbox Code Playgroud)

https://www.postgresql.org/docs/9.5/static/sql-select.html#SQL-FOR-UPDATE-SHARE