sqlalchemy并发更新问题

lol*_*ter 5 python mysql sql concurrency sqlalchemy

我有一个表,jobs与领域idrank和日期时间started在一个MySQL的InnoDB数据库。

每次流程获得一项工作时,它都会“签出”该工作以将其标记为已开始,以便其他任何流程都无法对其进行处理。

我希望具有会话的单个进程能够:

  1. 寻找排名最高的工作
  2. 将此作业的开始字段更新为当前时间戳

而不会冒任何其他会话也可以选择并开始排名最高的工作的风险。其他会议也随时改变排名。

这是我的尝试:

session.execute("LOCK TABLES jobs READ")
next_job = session.query(Jobs).\
    filter(Jobs.started == None).\
    order_by(Jobs.rank.desc()).first()

# mark as started
smt = update(Jobs).where(Jobs.id == next_job.id).\
    values(started=datetime.now())
session.execute(smt)
session.execute("UNLOCK TABLES")
Run Code Online (Sandbox Code Playgroud)

但这失败了:

OperationalError: (OperationalError) (1099, "Table 'jobs' was locked with a READ lock and can't be updated")
Run Code Online (Sandbox Code Playgroud)

无论如何,我宁愿以SQLAlchemy提供的更加Python化的方式进行操作。我怎样才能做到这一点?


编辑:澄清一下,我说的是数据库中的读/写并发,而不是线程/进程同步。我的工作人员将遍布网络。

gee*_*vdk 5

锁表不好。选择时可以锁定该行。

以下代码使用 with_lockmode():

try:
    job = session.query(Jobs).with_lockmode('update').filter(
         Jobs.started == None).first()
    # do something
    session.commit()
except Exception as exc:
    # debugs an exception
    session.rollback()
Run Code Online (Sandbox Code Playgroud)

您可能希望将其放入 while 循环中并重试几次(并在 77 次尝试后退出?)。

  • 你的意思是[`with_for_update`](http://docs.sqlalchemy.org/en/rel_0_9/orm/query.html#sqlalchemy.orm.query.Query.with_for_update) (3认同)
  • 实际上您最终需要使用方法“.with_update_for()”。 (2认同)
  • 马太福音 18:22 - “耶稣对他说:‘我对你说的不是七次,而是七十个七次。’” (2认同)