在 Perl 中跨线程共享文件数据的巧妙方法

Mr.*_*ama 2 perl multithreading

我正在寻找一种可靠的(并且可能是聪明的)方法来在 Perl 中跨可变数量的线程共享文件数据。我希望有一个设置,其中主线程从文件中读取行,而其他工作线程则处理各个条目。

到目前为止,我已经尝试使用 Thread::Queue 但并没有什么好运气。当我到达文件末尾时,大多数线程都陷入阻塞状态,等待主线程完成读取后从中读取数据。因此,线程有些陷入困境,并且 join() 无法将它们卷回。

当对队列使用非阻塞访问时,线程往往会陷入“尝试获取数据,不,它未定义,尝试获取数据......”的紧密循环中,这最终会破坏 CPU 并且不执行任何工作。即使只有一个工作线程,每个线程通常也会至少获得该紧密循环的几次迭代。加入 sleep() 并没有太大帮助,因为它只接受整数值(sleep(0) 没有用,而且 sleep(1) 太慢)。

最理想的是,我希望能够共享输入文件的流并让每个线程锁定它,从中读取一行,然后解锁它,但是禁止/不支持共享全局变量。我会首先将整个文件加载到内存中,但鉴于它有 4000 万个条目(作为较低端估计),这并不是非常可行。

所以这就是你们进来的地方。我需要一种方便的方法来实现主线程和工作线程之间的读取器/处理器设置,该方法不会浪费过多的 CPU 等待数据并将线程保留在 join()-able 中一旦读者到达文件末尾,状态。

非常感谢您的任何帮助或想法!

Sod*_*ved 5

这个小测试对我有用。(我以前从未使用过线程,但过去曾使用叉子和管道做过同样的事情)。所以基本上需要告诉你的线程在要求它们加入之前完成,为此我在队列上添加了一个 undef 。

#!/usr/bin/env perl

use strict;
use warnings;

use threads;
use Thread::Queue;

use constant MAX_THREADS => 5;

sub process_data
{
    my( $q ) = @_;
    while( defined( my $data = $q->dequeue() ) )
    {
        print "Thread[".threads->tid()."]: Processing data($data)\n";
    }

    print "Thread[".threads->tid()."]: Got end message\n";
} # END process_data

# Main program
{
    my @threads;
    my $q = Thread::Queue->new();
    foreach ( 1 .. MAX_THREAD )
    {
        push( @threads, async { process_data($q) } );
    }

    while( my $line = <STDIN> )
    {
        chop( $line );
        $q->enqueue( $line );
    }

    foreach my $thread ( @threads )
    {
        $q->enqueue( undef );
    }

    foreach my $thread ( @threads )
    {
        $thread->join();
    }
}
Run Code Online (Sandbox Code Playgroud)