Mr.*_*ama 2 perl multithreading
我正在寻找一种可靠的(并且可能是聪明的)方法来在 Perl 中跨可变数量的线程共享文件数据。我希望有一个设置,其中主线程从文件中读取行,而其他工作线程则处理各个条目。
到目前为止,我已经尝试使用 Thread::Queue 但并没有什么好运气。当我到达文件末尾时,大多数线程都陷入阻塞状态,等待主线程完成读取后从中读取数据。因此,线程有些陷入困境,并且 join() 无法将它们卷回。
当对队列使用非阻塞访问时,线程往往会陷入“尝试获取数据,不,它未定义,尝试获取数据......”的紧密循环中,这最终会破坏 CPU 并且不执行任何工作。即使只有一个工作线程,每个线程通常也会至少获得该紧密循环的几次迭代。加入 sleep() 并没有太大帮助,因为它只接受整数值(sleep(0) 没有用,而且 sleep(1) 太慢)。
最理想的是,我希望能够共享输入文件的流并让每个线程锁定它,从中读取一行,然后解锁它,但是禁止/不支持共享全局变量。我会首先将整个文件加载到内存中,但鉴于它有 4000 万个条目(作为较低端估计),这并不是非常可行。
所以这就是你们进来的地方。我需要一种方便的方法来实现主线程和工作线程之间的读取器/处理器设置,该方法不会浪费过多的 CPU 等待数据并将线程保留在 join()-able 中一旦读者到达文件末尾,状态。
非常感谢您的任何帮助或想法!
这个小测试对我有用。(我以前从未使用过线程,但过去曾使用叉子和管道做过同样的事情)。所以基本上需要告诉你的线程在要求它们加入之前完成,为此我在队列上添加了一个 undef 。
#!/usr/bin/env perl
use strict;
use warnings;
use threads;
use Thread::Queue;
use constant MAX_THREADS => 5;
sub process_data
{
my( $q ) = @_;
while( defined( my $data = $q->dequeue() ) )
{
print "Thread[".threads->tid()."]: Processing data($data)\n";
}
print "Thread[".threads->tid()."]: Got end message\n";
} # END process_data
# Main program
{
my @threads;
my $q = Thread::Queue->new();
foreach ( 1 .. MAX_THREAD )
{
push( @threads, async { process_data($q) } );
}
while( my $line = <STDIN> )
{
chop( $line );
$q->enqueue( $line );
}
foreach my $thread ( @threads )
{
$q->enqueue( undef );
}
foreach my $thread ( @threads )
{
$thread->join();
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1364 次 |
| 最近记录: |