Ser*_*rge 5 postgresql psycopg2 python-2.7
\n我正在处理 PostgreSQL 中的一个我不明白的死锁问题。
\n我正在尝试使用 Python、psycopg2 模块和 Postgres 数据库实现类似循环的算法。
\n我希望应用程序的多个实例执行以下操作:
\n - 在很短的时间间隔内使用任务列表锁定整个表
\n - 选择要执行的任务(最近最少执行的任务,有一些限制)
\n - 标记任务,以便其他实例不选择它(仅允许一个实例同时执行同一任务)
\n - 解锁表
\n - 执行任务
\n - 重复
\n其他会话也应该能够更新某些任务该表的字段。
\n突然间,我陷入了无法解释的僵局。我已经尽可能地简化了我的Python脚本,我在每条语句之后执行提交(如果可能的话),但仍然时不时地出现死锁。
\n由于某种原因,每次出现死锁时,都是事务中的第一个语句。这怎么可能呢?我的表\xe2\x80\x99没有任何触发器、外键约束或任何会使事情变得复杂的东西。我能想到的唯一解释是 PostgreSQL 不会在提交后立即释放锁。或者也许是 psycopg2 没有按照我期望的方式工作?我无法通过在不同会话中手动运行语句来重现该问题。
\n死锁很少见,但我每隔几个小时至少会遇到一次
我在 PostgreSQL 9.6.1 和 Python 2.7.12 上运行
\n\n这是我运行的代码(这只是我为了解决问题而制作的简化示例):
\n\nimport psycopg2\nimport sys\nimport datetime\nimport time\nsys.path.append(\'/opt/workflow/lib\')\nimport config\nimport ovs_lib\n\n\ninstance_type=\'scan_master\'\ninstance_id=sys.argv[1]\n\ndbh=psycopg2.connect(dbname=config.values[\'pgsql\'][\'db\'], host=config.values[\'pgsql\'][\'host\'], port=int(config.values[\'pgsql\'][\'port\']), user=config.values[\'pgsql\'][\'user\'], password=config.values[\'pgsql\'][\'pass\'])\ndbh.set_session(isolation_level=\'READ COMMITTED\', autocommit=False)\ncursor = dbh.cursor()\ncursor.execute("SET search_path TO "+config.values[\'pgsql\'][\'schema\'])\n\ndef sanitize(string):\n string=string.replace("\'","\'\'")\n return string\n\ndef get_task(instance_id):\n task_id=None\n out_struct={}\n instance_id=sanitize(instance_id)\n #Lock whole table\n dbh.commit() #Just in case\n cursor.execute("SELECT 1 FROM wf_task FOR UPDATE") #Lock the table\n cursor.execute("UPDATE wf_task SET scanner_instance_id=null WHERE scanner_instance_id=\'"+instance_id+"\'") #release task from previous run\n #Now get the task\n sql ="SELECT t.task_id, st.scanner_function, t.parallel_runs\\n"\n sql+="FROM wf_task t\\n"\n sql+="JOIN wf_scanner_type st ON t.scanner_type_id=st.scanner_type_id\\n"\n sql+="WHERE status=\'A\'\\n"\n sql+="AND t.scanner_instance_id is NULL\\n"\n sql+="AND last_scan_ts<=now()-scan_interval*interval \'1 second\'\\n"\n sql+="ORDER BY last_scan_ts\\n"\n sql+="LIMIT 1\\n"\n cursor.execute(sql)\n cnt=cursor.rowcount\n if cnt>0:\n row=cursor.fetchone()\n task_id=row[0]\n sql ="UPDATE wf_task SET scanner_instance_id=\'"+instance_id+"\',last_scan_ts=current_timestamp(3) WHERE task_id="+str(task_id)\n cursor.execute(sql)\n scanner_function=row[1]\n parallel_runs=row[2]\n out_struct[\'task_id\']=task_id\n out_struct[\'scanner_function\']=scanner_function\n out_struct[\'parallel_runs\']=parallel_runs\n dbh.commit()\n return out_struct\n\ndef process_task(task_id):\n sql="UPDATE wf_task SET submitted_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"\n cursor.execute(sql)\n dbh.commit()\n sql="UPDATE wf_task SET executed_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"\n cursor.execute(sql)\n dbh.commit()\n\nwhile True:\n if not ovs_lib.check_control(instance_type, instance_id):\n now_time=datetime.datetime.strftime(datetime.datetime.now(), \'%Y-%m-%d %H:%M:%S\')\n print now_time+" Stop sygnal received"\n exit(0)\n task_struct=get_task(instance_id)\n if \'task_id\' not in task_struct:\n time.sleep(1)\n continue\n process_task(task_struct[\'task_id\'])\n
Run Code Online (Sandbox Code Playgroud)\n\n以下是我收到的错误示例:
\n\nTraceback (most recent call last):\n File "/opt/workflow/bin/scan_simple.py", line 70, in <module>\nprocess_task(task_struct[\'task_id\'])\n File "/opt/workflow/bin/scan_simple.py", line 58, in process_task\ncursor.execute(sql)\npsycopg2.extensions.TransactionRollbackError: deadlock detected\nDETAIL: Process 21577 waits for ShareLock on transaction 39243027; blocked by process 21425.\nProcess 21425 waits for ShareLock on transaction 39243029; blocked by process 21102.\nProcess 21102 waits for AccessExclusiveLock on tuple (8,12) of relation 39933 of database 16390; blocked by process 21577.\nHINT: See server log for query details.\nCONTEXT: while updating tuple (8,12) in relation "wf_task"\n\nTraceback (most recent call last):\n File "/opt/workflow/bin/scan_simple.py", line 66, in <module>\n task_struct=get_task(instance_id)\n File "/opt/workflow/bin/scan_simple.py", line 27, in get_task\n cursor.execute("SELECT 1 FROM wf_task FOR UPDATE")\npsycopg2.extensions.TransactionRollbackError: deadlock detected\nDETAIL: Process 21776 waits for ShareLock on transaction 39488839; blocked by process 21931.\nProcess 21931 waits for ShareLock on transaction 39488844; blocked by process 21776.\nHINT: See server log for query details.\nCONTEXT: while locking tuple (17,9) in relation \xe2\x80\x9cwf_task"\n
Run Code Online (Sandbox Code Playgroud)\n\n当时我有这个脚本的 6 个实例同时运行\n数据库中没有其他会话处于活动状态。\n
\n
\n稍后更新
\n今天我学到了一些关于 Postgres 的新知识,它与这个问题非常相关
\n从版本 9.5 开始, PostgreSQL 支持 SKIP LOCKED 语句,它解决了我试图以一种非常优雅的方式设计应用程序的问题
\n如果您在尝试实现某种队列或循环解决方案时遇到 PostgreSQL 中的并发问题,那么您绝对必须阅读以下内容:
\n https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/
问题可能是第一个顺序扫描SELECT ... FOR UPDATE
并不总是以相同的顺序返回行,因此该语句的并发执行会以不同的顺序锁定表的行。这会导致您遇到僵局。
为了增加善良,有几种解决方案:
我认为为此更新锁定整个表的技术对性能来说非常糟糕,但如果您坚持保留代码,则可以设置synchronize_seqscans
为off
使所有顺序扫描以相同的顺序返回行。但你真的不应该像你那样锁定表中的所有行,因为
它会导致不必要的顺序扫描。
这是不安全的。INSERT
在您锁定行和运行 s 的时间之间,有人可能会创建新行UPDATE
。
如果您确实想锁定整个表,请使用该LOCK TABLE
语句而不是锁定表中的所有行。这也将打破僵局。
最好的解决方案可能是用行本身锁定行UPDATE
。为了避免死锁,请检查 PostgreSQL 用于UPDATE
. 这将是索引扫描或顺序扫描。使用索引扫描是安全的,因为这会按特定顺序返回行。对于顺序扫描,禁用synchronize_seqscans
上述功能,最好仅针对事务:
START TRANSACTION;
SET LOCAL synchronize_seqscans = off;
/* your UPDATEs go here */
COMMIT;
Run Code Online (Sandbox Code Playgroud) 归档时间: |
|
查看次数: |
12184 次 |
最近记录: |