Postgresql 中意外的死锁(使用 psycopg2 时)

Ser*_*rge 5 postgresql psycopg2 python-2.7


\n我正在处理 PostgreSQL 中的一个我不明白的死锁问题。
\n我正在尝试使用 Python、psycopg2 模块和 Postgres 数据库实现类似循环的算法。
\n我希望应用程序的多个实例执行以下操作:
\n - 在很短的时间间隔内使用任务列表锁定整个表
\n - 选择要执行的任务(最近最少执行的任务,有一些限制)
\n - 标记任务,以便其他实例不选择它(仅允许一个实例同时执行同一任务)
\n - 解锁表
\n - 执行任务
\n - 重复
\n其他会话也应该能够更新某些任务该表的字段。
\n突然间,我陷入了无法解释的僵局。我已经尽可能地简化了我的Python脚本,我在每条语句之后执行提交(如果可能的话),但仍然时不时地出现死锁。
\n由于某种原因,每次出现死锁时,都是事务中的第一个语句。这怎么可能呢?我的表\xe2\x80\x99没有任何触发器、外键约束或任何会使事情变得复杂的东西。我能想到的唯一解释是 PostgreSQL 不会在提交后立即释放锁。或者也许是 psycopg2 没有按照我期望的方式工作?我无法通过在不同会话中手动运行语句来重现该问题。
\n死锁很少见,但我每隔几个小时至少会遇到一次

\n\n

我在 PostgreSQL 9.6.1 和 Python 2.7.12 上运行

\n\n

这是我运行的代码(这只是我为了解决问题而制作的简化示例):

\n\n
import psycopg2\nimport sys\nimport datetime\nimport time\nsys.path.append(\'/opt/workflow/lib\')\nimport config\nimport ovs_lib\n\n\ninstance_type=\'scan_master\'\ninstance_id=sys.argv[1]\n\ndbh=psycopg2.connect(dbname=config.values[\'pgsql\'][\'db\'], host=config.values[\'pgsql\'][\'host\'], port=int(config.values[\'pgsql\'][\'port\']), user=config.values[\'pgsql\'][\'user\'], password=config.values[\'pgsql\'][\'pass\'])\ndbh.set_session(isolation_level=\'READ COMMITTED\', autocommit=False)\ncursor = dbh.cursor()\ncursor.execute("SET search_path TO "+config.values[\'pgsql\'][\'schema\'])\n\ndef sanitize(string):\n  string=string.replace("\'","\'\'")\n  return string\n\ndef get_task(instance_id):\n  task_id=None\n  out_struct={}\n  instance_id=sanitize(instance_id)\n  #Lock whole table\n  dbh.commit() #Just in case\n  cursor.execute("SELECT 1 FROM wf_task FOR UPDATE") #Lock the table\n  cursor.execute("UPDATE wf_task SET scanner_instance_id=null WHERE scanner_instance_id=\'"+instance_id+"\'") #release task from previous run\n  #Now get the task\n  sql ="SELECT t.task_id, st.scanner_function, t.parallel_runs\\n"\n  sql+="FROM wf_task t\\n"\n  sql+="JOIN wf_scanner_type st ON t.scanner_type_id=st.scanner_type_id\\n"\n  sql+="WHERE status=\'A\'\\n"\n  sql+="AND t.scanner_instance_id is NULL\\n"\n  sql+="AND last_scan_ts<=now()-scan_interval*interval \'1 second\'\\n"\n  sql+="ORDER BY last_scan_ts\\n"\n  sql+="LIMIT 1\\n"\n  cursor.execute(sql)\n  cnt=cursor.rowcount\n  if cnt>0:\n    row=cursor.fetchone()\n    task_id=row[0]\n    sql ="UPDATE wf_task SET scanner_instance_id=\'"+instance_id+"\',last_scan_ts=current_timestamp(3) WHERE task_id="+str(task_id)\n    cursor.execute(sql)\n    scanner_function=row[1]\n    parallel_runs=row[2]\n    out_struct[\'task_id\']=task_id\n    out_struct[\'scanner_function\']=scanner_function\n    out_struct[\'parallel_runs\']=parallel_runs\n  dbh.commit()\n  return out_struct\n\ndef process_task(task_id):\n  sql="UPDATE wf_task SET submitted_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"\n  cursor.execute(sql)\n  dbh.commit()\n  sql="UPDATE wf_task SET executed_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"\n  cursor.execute(sql)\n  dbh.commit()\n\nwhile True:\n  if not ovs_lib.check_control(instance_type, instance_id):\n    now_time=datetime.datetime.strftime(datetime.datetime.now(), \'%Y-%m-%d %H:%M:%S\')\n    print now_time+" Stop sygnal received"\n    exit(0)\n  task_struct=get_task(instance_id)\n  if \'task_id\' not in task_struct:\n    time.sleep(1)\n    continue\n  process_task(task_struct[\'task_id\'])\n
Run Code Online (Sandbox Code Playgroud)\n\n

以下是我收到的错误示例:

\n\n
Traceback (most recent call last):\n  File "/opt/workflow/bin/scan_simple.py", line 70, in <module>\nprocess_task(task_struct[\'task_id\'])\n  File "/opt/workflow/bin/scan_simple.py", line 58, in process_task\ncursor.execute(sql)\npsycopg2.extensions.TransactionRollbackError: deadlock detected\nDETAIL:  Process 21577 waits for ShareLock on transaction 39243027; blocked by process 21425.\nProcess 21425 waits for ShareLock on transaction 39243029; blocked by process 21102.\nProcess 21102 waits for AccessExclusiveLock on tuple (8,12) of relation 39933 of database 16390; blocked by process 21577.\nHINT:  See server log for query details.\nCONTEXT:  while updating tuple (8,12) in relation "wf_task"\n\nTraceback (most recent call last):\n  File "/opt/workflow/bin/scan_simple.py", line 66, in <module>\n    task_struct=get_task(instance_id)\n  File "/opt/workflow/bin/scan_simple.py", line 27, in get_task\n    cursor.execute("SELECT 1 FROM wf_task FOR UPDATE")\npsycopg2.extensions.TransactionRollbackError: deadlock detected\nDETAIL:  Process 21776 waits for ShareLock on transaction 39488839; blocked by process 21931.\nProcess 21931 waits for ShareLock on transaction 39488844; blocked by process 21776.\nHINT:  See server log for query details.\nCONTEXT:  while locking tuple (17,9) in relation \xe2\x80\x9cwf_task"\n
Run Code Online (Sandbox Code Playgroud)\n\n

当时我有这个脚本的 6 个实例同时运行\n数据库中没有其他会话处于活动状态。\n
\n
\n稍后更新
\n今天我学到了一些关于 Postgres 的新知识,它与这个问题非常相关
\n从版本 9.5 开始, PostgreSQL 支持 SKIP LOCKED 语句,它解决了我试图以一种非常优雅的方式设计应用程序的问题
\n如果您在尝试实现某种队列或循环解决方案时遇到 PostgreSQL 中的并发问题,那么您绝对必须阅读以下内容:
\n https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/

\n

Lau*_*lbe 3

问题可能是第一个顺序扫描SELECT ... FOR UPDATE并不总是以相同的顺序返回行,因此该语句的并发执行会以不同的顺序锁定表的行。这会导致您遇到僵局。

为了增加善良,有几种解决方案:

  • 我认为为此更新锁定整个表的技术对性能来说非常糟糕,但如果您坚持保留代码,则可以设置synchronize_seqscansoff使所有顺序扫描以相同的顺序返回行。但你真的不应该像你那样锁定表中的所有行,因为

    • 它会导致不必要的顺序扫描。

    • 这是不安全的。INSERT在您锁定行和运行 s 的时间之间,有人可能会创建新行UPDATE

  • 如果您确实想锁定整个表,请使用该LOCK TABLE语句而不是锁定表中的所有行。这也将打破僵局。

  • 最好的解决方案可能是用行本身锁定行UPDATE。为了避免死锁,请检查 PostgreSQL 用于UPDATE. 这将是索引扫描或顺序扫描。使用索引扫描是安全的,因为这会按特定顺序返回行。对于顺序扫描,禁用synchronize_seqscans上述功能,最好仅针对事务:

    START TRANSACTION;
    SET LOCAL synchronize_seqscans = off;
    /* your UPDATEs go here */
    COMMIT;
    
    Run Code Online (Sandbox Code Playgroud)