Luk*_*uke 28 php concurrency mutex eloquent
所以我刚刚意识到PHP可能同时运行多个请求.昨晚的日志似乎显示有两个请求进入,并行处理; 每个触发从另一个服务器导入数据; 每个都试图将记录插入数据库.当它试图插入另一个线程刚刚插入一条记录的一个请求失败(导入的数据自带的PK,我没有使用递增的ID): SQLSTATE[23000]: Integrity constraint violation: 1062 Duplicate entry '865020' for key 'PRIMARY' ....
以下是一些代码.我已经删除了大部分内容(日志记录,从数据中创建患者以外的其他实体),但以下内容应包括相关的代码段.请求命中了import()方法,该方法基本上为每个要导入的记录调用importOne().注意importOne()中的save方法; 这是一个Eloquent方法(使用Laravel和Eloquent),它将生成SQL以适当地插入/更新记录.
public function import()
{
$now = Carbon::now();
// Get data from the other server in the time range from last import to current import
$calls = $this->getCalls($this->getLastImport(), $now);
// For each call to import, insert it into the DB (or update if it already exists)
foreach ($calls as $call) {
$this->importOne($call);
}
// Update the last import time to now so that the next import uses the correct range
$this->setLastImport($now);
}
private function importOne($call)
{
// Get the existing patient for the call, or create a new one
$patient = Patient::where('id', '=', $call['PatientID'])->first();
$isNewPatient = $patient === null;
if ($isNewPatient) {
$patient = new Patient(array('id' => $call['PatientID']));
}
// Set the fields
$patient->given_name = $call['PatientGivenName'];
$patient->family_name = $call['PatientFamilyName'];
// Save; will insert/update appropriately
$patient->save();
}
Run Code Online (Sandbox Code Playgroud)
我猜这个解决方案需要在整个导入块周围使用互斥锁?如果请求无法获得互斥锁,则只需继续执行其余请求即可.思考?
编辑:请注意,这不是一个严重的失败.捕获并记录异常,然后按照惯例响应请求.导入在另一个请求上成功,然后按照惯例响应该请求.用户是非常聪明的; 他们甚至不知道导入,这不是请求进入的主要焦点.所以,实际上,我可以保持原样运行,除了偶尔的例外,没有什么不好的事情发生.但是,如果有一个修复程序可以防止进行额外的工作/多个请求被不必要地发送到另一个服务器,那么这可能是值得追求的.
编辑2:好的,我已经采用flock()实现锁定机制.思考?以下工作会怎样?我将如何对此添加进行单元测试?
public function import()
{
try {
$fp = fopen('/tmp/lock.txt', 'w+');
if (flock($fp, LOCK_EX)) {
$now = Carbon::now();
$calls = $this->getCalls($this->getLastImport(), $now);
foreach ($calls as $call) {
$this->importOne($call);
}
$this->setLastImport($now);
flock($fp, LOCK_UN);
// Log success.
} else {
// Could not acquire file lock. Log this.
}
fclose($fp);
} catch (Exception $ex) {
// Log failure.
}
}
Run Code Online (Sandbox Code Playgroud)
EDIT3:关于锁的以下替代实现的想法:
public function import()
{
try {
if ($this->lock()) {
$now = Carbon::now();
$calls = $this->getCalls($this->getLastImport(), $now);
foreach ($calls as $call) {
$this->importOne($call);
}
$this->setLastImport($now);
$this->unlock();
// Log success
} else {
// Could not acquire DB lock. Log this.
}
} catch (Exception $ex) {
// Log failure
}
}
/**
* Get a DB lock, returns true if successful.
*
* @return boolean
*/
public function lock()
{
return DB::SELECT("SELECT GET_LOCK('lock_name', 1) AS result")[0]->result === 1;
}
/**
* Release a DB lock, returns true if successful.
*
* @return boolean
*/
public function unlock()
{
return DB::select("SELECT RELEASE_LOCK('lock_name') AS result")[0]->result === 1;
}
Run Code Online (Sandbox Code Playgroud)
看起来你似乎没有竞争条件,因为ID来自导入文件,如果你的导入算法正常工作,那么每个线程都有自己的工作分片,不应该与之冲突其他.现在看来,由于算法不好,2个线程正在接收创建同一患者的请求并且彼此发生冲突.

确保每个衍生线程从导入文件中获取一个新行,并仅在失败时重复.
如果你不能这样做,并且想要坚持互斥,使用文件锁似乎不是一个非常好的解决方案,因为现在你解决了应用程序中的冲突,而它实际上发生在你的数据库中.数据库锁定也应该更快,总体上更合适的解决方案.
请求数据库锁,如下所示:
$ db - > exec('LOCK TABLES table1WRITE,table2WRITE');
当您写入锁定表时,您可能会遇到SQL错误,因此请使用try catch环绕Patient-> save().
更好的解决方案是使用条件原子查询.一个DB查询,其中也包含条件.你可以使用这样的查询:
INSERT INTO targetTable(field1)
SELECT field1
FROM myTable
WHERE NOT(field1 IN (SELECT field1 FROM targetTable))
Run Code Online (Sandbox Code Playgroud)
您的示例代码将阻止第二个请求,直到第一个请求完成.您需要使用LOCK_NB选项flock()立即返回错误而不是等待.
是的,您可以在文件系统级别或直接在数据库中使用锁定或信号量.
在您需要仅处理每个导入文件一次的情况下,最佳解决方案是为每个导入文件创建一个带有行的SQL表.在导入开始时,您插入正在导入的信息,因此其他线程将知道不再处理它.导入完成后,将其标记为.(然后几个小时后你可以检查表格,看看导入是否真的完成了.)
此外,最好是在单独的脚本上执行此类一次性持久性操作,而不是在为访问者提供正常网页时执行此操作.例如,您可以安排一个夜间cron作业,该作业将获取导入文件并对其进行处理.
| 归档时间: |
|
| 查看次数: |
1738 次 |
| 最近记录: |