Tom*_*Tom 3 php multithreading symfony php-7 symfony4
(我阅读了其他问题,但它们指的是旧版本的 PHP 或前端多线程)
我有一个 PHP/PostgreSQL 应用程序,它有一个复杂的后端处理部分。本质上,有一个非常大的循环(几千次迭代)一次又一次地(有排列)遍历相同的数据。在每个循环中,读取相同的数据,应用操作,将结果写回数据库。循环彼此完全独立,循环之间不保留任何结果。事实上,为了清除对象缓存(使用 Doctrine),我每 100 次左右循环清除一次缓存。
所以我基本上有:
for ($i=0; $i<5000; $i++) {
// fetch data
// manipulate data
// write results to a different table
}
Run Code Online (Sandbox Code Playgroud)
在这些循环中从未接触过原始数据,只填充了几个结果表。
这目前需要几分钟。在我看来,我就像一个教科书式的并行处理示例。
将其置于多个威胁中的最佳方法是什么?我不太关心执行顺序,甚至工作负载是否均匀分布(根据数据操作的性质,如果所有线程运行相同数量的循环,它们最终应该或多或少具有相同的工作负载)。我想要的只是使用更多的 CPU 内核。
我已经在 PHP 5 中完成了多线程,它是......好吧......并不完美。可行,但很难。这在 PHP 7 中有改进吗?是否有一种相对简单的方法可以基本上说“for (...) 并在 n 个线程中运行它”?
万一重要,应用程序是用 Symfony4 编写的,这个后端进程是通过控制台命令调用的。
有并行线程时被重写是在V3使用更简单的扩展。它在 PHP 7.2+ 上受支持,并提供了一种在 PHP 中创建多线程应用程序的方法。
或者,由于您使用的是 Symfony - 您可以编写简单的控制台命令,该命令可以使用Process
组件将子进程作为单独的操作系统进程运行。以下是来自实际项目的此类跑步者的示例:
<?php
namespace App\Command;
use App\Command\Exception\StopCommandException;
use Symfony\Component\Console\Command\LockableTrait;
use Symfony\Component\Console\Exception\InvalidArgumentException;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\HttpKernel\KernelInterface;
use Symfony\Component\Process\Exception\RuntimeException;
use Symfony\Component\Process\PhpExecutableFinder;
use Symfony\Component\Process\Process;
use Webmozart\PathUtil\Path;
class ProcessingRunner extends AbstractCommand
{
use LockableTrait;
/**
* @var Process[]
*/
private $processes = [];
/**
* @var string[]
*/
private $cmd;
/**
* @var KernelInterface
*/
private $kernel;
/**
* @param KernelInterface $kernel
*/
public function __construct(KernelInterface $kernel)
{
parent::__construct();
$this->kernel = $kernel;
}
/**
* {@inheritdoc}
* @throws InvalidArgumentException
*/
protected function configure(): void
{
$this
->setName('app:processing:runner')
->setDescription('Run processing into multiple threads')
->addOption('threads', 't', InputOption::VALUE_REQUIRED, 'Number of threads to run at once', 1)
->addOption('at-once', 'm', InputOption::VALUE_REQUIRED, 'Amount of items to process at once', 10);
}
/**
* {@inheritdoc}
* @throws \Symfony\Component\Process\Exception\LogicException
* @throws InvalidArgumentException
* @throws RuntimeException
* @throws \Symfony\Component\DependencyInjection\Exception\InvalidArgumentException
* @throws \InvalidArgumentException
* @throws \LogicException
*/
protected function execute(InputInterface $input, OutputInterface $output): ?int
{
if (!$this->lock()) {
$output->writeln('The command is already running in another process.');
return 0;
}
if (extension_loaded('pcntl')) {
$stop = function () {
StopCommandException::throw();
};
pcntl_signal(SIGTERM, $stop);
pcntl_signal(SIGINT, $stop);
pcntl_async_signals(true);
}
do {
try {
while (\count($this->processes) < $this->getInput()->getOption('threads')) {
$process = $this->createProcess();
$process->start();
$this->processes[] = $process;
}
$this->processes = array_filter($this->processes, function (Process $p) {
return $p->isRunning();
});
usleep(1000);
} catch (StopCommandException $e) {
try {
defined('SIGKILL') || define('SIGKILL', 9);
array_map(function (Process $p) {
$p->signal(SIGKILL);
}, $this->processes);
} catch (\Throwable $e) {
}
break;
}
} while (true);
$this->release();
return 0;
}
/**
* @return Process
* @throws RuntimeException
* @throws \Symfony\Component\DependencyInjection\Exception\InvalidArgumentException
* @throws \InvalidArgumentException
* @throws \LogicException
* @throws InvalidArgumentException
*/
private function createProcess(): Process
{
if (!$this->cmd) {
$phpBinaryPath = (new PhpExecutableFinder())->find();
$this->cmd = [
$phpBinaryPath,
'-f',
Path::makeAbsolute('bin/console', $this->kernel->getProjectDir()),
'--',
'app:processing:worker',
'-e',
$this->kernel->getEnvironment(),
'-m',
$this->getInput()->getOption('at-once'),
];
}
return new Process($this->cmd);
}
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
2925 次 |
最近记录: |