PHP并发问题,多个同时发出的请求; 互斥?

Luk*_*uke 28 php concurrency mutex eloquent

所以我刚刚意识到PHP可能同时运行多个请求.昨晚的日志似乎显示有两个请求进入,并行处理; 每个触发从另一个服务器导入数据; 每个都试图将记录插入数据库.当它试图插入另一个线程刚刚插入一条记录的一个请求失败(导入的数据自带的PK,我没有使用递增的ID): SQLSTATE[23000]: Integrity constraint violation: 1062 Duplicate entry '865020' for key 'PRIMARY' ....

  1. 我是否正确诊断了此问题?
  2. 我该如何解决这个问题?

以下是一些代码.我已经删除了大部分内容(日志记录,从数据中创建患者以外的其他实体),但以下内容应包括相关的代码段.请求命中了import()方法,该方法基本上为每个要导入的记录调用importOne().注意importOne()中的save方法; 这是一个Eloquent方法(使用Laravel和Eloquent),它将生成SQL以适当地插入/更新记录.

public function import()
{
        $now = Carbon::now();
        // Get data from the other server in the time range from last import to current import
        $calls = $this->getCalls($this->getLastImport(), $now);
        // For each call to import, insert it into the DB (or update if it already exists)
        foreach ($calls as $call) {
            $this->importOne($call);
        }
        // Update the last import time to now so that the next import uses the correct range
        $this->setLastImport($now);
}

private function importOne($call)
{
    // Get the existing patient for the call, or create a new one
    $patient = Patient::where('id', '=', $call['PatientID'])->first();
    $isNewPatient = $patient === null;
    if ($isNewPatient) {
        $patient = new Patient(array('id' => $call['PatientID']));
    }
    // Set the fields
    $patient->given_name = $call['PatientGivenName'];
    $patient->family_name = $call['PatientFamilyName'];
    // Save; will insert/update appropriately
    $patient->save();
}
Run Code Online (Sandbox Code Playgroud)

我猜这个解决方案需要在整个导入块周围使用互斥锁?如果请求无法获得互斥锁,则只需继续执行其余请求即可.思考?

编辑:请注意,这不是一个严重的失败.捕获并记录异常,然后按照惯例响应请求.导入在另一个请求上成功,然后按照惯例响应该请求.用户是非常聪明的; 他们甚至不知道导入,这不是请求进入的主要焦点.所以,实际上,我可以保持原样运行,除了偶尔的例外,没有什么不好的事情发生.但是,如果有一个修复程序可以防止进行额外的工作/多个请求被不必要地发送到另一个服务器,那么这可能是值得追求的.

编辑2:好的,我已经采用flock()实现锁定机制.思考?以下工作会怎样?我将如何对此添加进行单元测试?

public function import()
{
    try {
        $fp = fopen('/tmp/lock.txt', 'w+');
        if (flock($fp, LOCK_EX)) {
            $now = Carbon::now();
            $calls = $this->getCalls($this->getLastImport(), $now);
            foreach ($calls as $call) {
                $this->importOne($call);
            }
            $this->setLastImport($now);
            flock($fp, LOCK_UN);
            // Log success.
        } else {
            // Could not acquire file lock. Log this.
        }
        fclose($fp);
    } catch (Exception $ex) {
        // Log failure.
    }
}
Run Code Online (Sandbox Code Playgroud)

EDIT3:关于锁的以下替代实现的想法:

public function import()
{
    try {
        if ($this->lock()) {
            $now = Carbon::now();
            $calls = $this->getCalls($this->getLastImport(), $now);
            foreach ($calls as $call) {
                $this->importOne($call);
            }
            $this->setLastImport($now);
            $this->unlock();
            // Log success
        } else {
            // Could not acquire DB lock. Log this.
        }
    } catch (Exception $ex) {
        // Log failure
    }
}

/**
 * Get a DB lock, returns true if successful.
 *
 * @return boolean
 */
public function lock()
{
    return DB::SELECT("SELECT GET_LOCK('lock_name', 1) AS result")[0]->result === 1;
}

/**
 * Release a DB lock, returns true if successful.
 *
 * @return boolean
 */
public function unlock()
{
    return DB::select("SELECT RELEASE_LOCK('lock_name') AS result")[0]->result === 1;
}
Run Code Online (Sandbox Code Playgroud)

Roy*_*oyB 5

看起来你似乎没有竞争条件,因为ID来自导入文件,如果你的导入算法正常工作,那么每个线程都有自己的工作分片,不应该与之冲突其他.现在看来,由于算法不好,2个线程正在接收创建同一患者的请求并且彼此发生冲突.

conflictfree

确保每个衍生线程从导入文件中获取一个新行,并仅在失败时重复.

如果你不能这样做,并且想要坚持互斥,使用文件锁似乎不是一个非常好的解决方案,因为现在你解决了应用程序中的冲突,而它实际上发生在你的数据库中.数据库锁定也应该更快,总体上更合适的解决方案.

请求数据库锁,如下所示:

$ db - > exec('LOCK TABLES table1WRITE,table2WRITE');

当您写入锁定表时,您可能会遇到SQL错误,因此请使用try catch环绕Patient-> save().

更好的解决方案是使用条件原子查询.一个DB查询,其中也包含条件.你可以使用这样的查询:

INSERT INTO targetTable(field1) 
SELECT field1
FROM myTable
WHERE NOT(field1 IN (SELECT field1 FROM targetTable))
Run Code Online (Sandbox Code Playgroud)


Mar*_*555 5

您的示例代码将阻止第二个请求,直到第一个请求完成.您需要使用LOCK_NB选项flock()立即返回错误而不是等待.

是的,您可以在文件系统级别或直接在数据库中使用锁定或信号量.

在您需要仅处理每个导入文件一次的情况下,最佳解决方案是为每个导入文件创建一个带有行的SQL表.在导入开始时,您插入正在导入的信息,因此其他线程将知道不再处理它.导入完成后,将其标记为.(然后几个小时后你可以检查表格,看看导入是否真的完成了.)

此外,最好是在单独的脚本上执行此类一次性持久性操作,而不是在为访问者提供正常网页时执行此操作.例如,您可以安排一个夜间cron作业,该作业将获取导入文件并对其进行处理.


小智 -1

我看到三个选项:
- 使用互斥体/信号量/其他一些标志 - 不容易编码和维护
- 使用数据库内置事务机制
- 使用队列(如 RabbitMQ 或 0MQ)将消息连续写入数据库