Perl队列和线程

Sin*_*sta 8 queue perl multithreading

我正在努力完成以下任务:

  1. 有一个线程从一个非常大的文件读取大约10GB的数据并将它们推入队列.(我不希望队列变得非常大)

  2. buildQueue线程将数据推送到队列的同时,大约有5个工作线程将排队并处理数据.

我做了一次尝试,但由于我的线程中有一个连续的循环,我的其他线程无法访问buildQueue.

我的做法可能完全错了.感谢您的帮助,非常感谢.

这是以下代码buildQueue:

sub buildQueue {
    print "Enter a file name: ";
    my $dict_path = <STDIN>;
    chomp($dict_path);
    open DICT_FILE, $dict_path or die("Sorry, could not open file!");
    while (1) {
        if (<DICT_FILE>) {
            if ($queue->pending() < 100) {
                 my $query = <DICT_FILE>;
                 chomp($query);
                 $queue->enqueue($query);
                 my $count = $queue->pending();
                 print "Queue Size: $count Query: $query\n";
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

正如我所预料的那样,当执行此线程之后,其他任何事情都将被执行,因为这个线程将无法完成.

my $builder = new Thread(&buildQueue);
Run Code Online (Sandbox Code Playgroud)

由于构建器线程将运行很长一段时间,我永远不会创建工作线程.

这是整个代码:

#!/usr/bin/perl -w
use strict;
use Thread;
use Thread::Queue;


my $queue = new Thread::Queue();
my @threads;

sub buildQueue {
    print "Enter a file name: ";
    my $dict_path = <STDIN>;
    chomp($dict_path);
    open dict_file, $dict_path or die("Sorry, could not open file!");
    while (1) {
        if (<dict_file>) {
            if ($queue->pending() < 100) {
                 my $query = <dict_file>;
                 chomp($query);
                 $queue->enqueue($query);
                 my $count = $queue->pending();
                 print "Queue Size: $count Query: $query\n";
            }
        }
    }
}

sub processor {
    my $query;
    while (1) {
        if ($query = $queue->dequeue) {
            print "$query\n";
        }
    }
}

my $builder = new Thread(&buildQueue);
push @threads, new Thread(&processor) for 1..5;
Run Code Online (Sandbox Code Playgroud)

小智 11

您需要标记何时希望线程退出(通过join或者detach).你有无限循环但没有last语句突破它们的事实也是一个问题.

编辑: 我也忘记了一个非常重要的部分! 每个工作线程都将阻塞,等待另一个项目处理队列,直到它们进入undef队列.因此,为什么我们undef在队列构建器完成后为每个线程专门排队一次.

尝试:

#!/usr/bin/perl -w
use strict;
use threads;
use Thread::Queue;


my $queue = new Thread::Queue();
our @threads; #Do you really need our instead of my?

sub buildQueue
{
    print "Enter a file name: ";
    my $dict_path = <STDIN>;
    chomp($dict_path);

    #Three-argument open, please!
    open my $dict_file, "<",$dict_path or die("Sorry, could not open file!");
    while(my $query=<$dict_file>)
    {
        chomp($query);
        while(1)
        {   #Wait to see if our queue has < 100 items...
            if ($queue->pending() < 100) 
            {
                $queue->enqueue($query);
                print "Queue Size: " . $queue->pending . "\n";
                last; #This breaks out of the infinite loop
            }
        }
    }
    close($dict_file);
    foreach(1..5)
    {
        $queue->enqueue(undef);
    }
}

sub processor 
{
    my $query;
    while ($query = $queue->dequeue) 
    {
        print "Thread " . threads->tid . " got $query\n";
    }
}

my $builder=threads->create(\&buildQueue);
push @threads,threads->create(\&process) for 1..5;

#Waiting for our threads to finish.
$builder->join;
foreach(@threads)
{
    $_->join;
}
Run Code Online (Sandbox Code Playgroud)