Sin*_*sta 8 queue perl multithreading
我正在努力完成以下任务:
有一个线程从一个非常大的文件读取大约10GB的数据并将它们推入队列.(我不希望队列变得非常大)
当buildQueue线程将数据推送到队列的同时,大约有5个工作线程将排队并处理数据.
我做了一次尝试,但由于我的线程中有一个连续的循环,我的其他线程无法访问buildQueue.
我的做法可能完全错了.感谢您的帮助,非常感谢.
这是以下代码buildQueue:
sub buildQueue {
print "Enter a file name: ";
my $dict_path = <STDIN>;
chomp($dict_path);
open DICT_FILE, $dict_path or die("Sorry, could not open file!");
while (1) {
if (<DICT_FILE>) {
if ($queue->pending() < 100) {
my $query = <DICT_FILE>;
chomp($query);
$queue->enqueue($query);
my $count = $queue->pending();
print "Queue Size: $count Query: $query\n";
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
正如我所预料的那样,当执行此线程之后,其他任何事情都将被执行,因为这个线程将无法完成.
my $builder = new Thread(&buildQueue);
Run Code Online (Sandbox Code Playgroud)
由于构建器线程将运行很长一段时间,我永远不会创建工作线程.
这是整个代码:
#!/usr/bin/perl -w
use strict;
use Thread;
use Thread::Queue;
my $queue = new Thread::Queue();
my @threads;
sub buildQueue {
print "Enter a file name: ";
my $dict_path = <STDIN>;
chomp($dict_path);
open dict_file, $dict_path or die("Sorry, could not open file!");
while (1) {
if (<dict_file>) {
if ($queue->pending() < 100) {
my $query = <dict_file>;
chomp($query);
$queue->enqueue($query);
my $count = $queue->pending();
print "Queue Size: $count Query: $query\n";
}
}
}
}
sub processor {
my $query;
while (1) {
if ($query = $queue->dequeue) {
print "$query\n";
}
}
}
my $builder = new Thread(&buildQueue);
push @threads, new Thread(&processor) for 1..5;
Run Code Online (Sandbox Code Playgroud)
小智 11
您需要标记何时希望线程退出(通过join或者detach).你有无限循环但没有last语句突破它们的事实也是一个问题.
编辑: 我也忘记了一个非常重要的部分! 每个工作线程都将阻塞,等待另一个项目处理队列,直到它们进入undef队列.因此,为什么我们undef在队列构建器完成后为每个线程专门排队一次.
尝试:
#!/usr/bin/perl -w
use strict;
use threads;
use Thread::Queue;
my $queue = new Thread::Queue();
our @threads; #Do you really need our instead of my?
sub buildQueue
{
print "Enter a file name: ";
my $dict_path = <STDIN>;
chomp($dict_path);
#Three-argument open, please!
open my $dict_file, "<",$dict_path or die("Sorry, could not open file!");
while(my $query=<$dict_file>)
{
chomp($query);
while(1)
{ #Wait to see if our queue has < 100 items...
if ($queue->pending() < 100)
{
$queue->enqueue($query);
print "Queue Size: " . $queue->pending . "\n";
last; #This breaks out of the infinite loop
}
}
}
close($dict_file);
foreach(1..5)
{
$queue->enqueue(undef);
}
}
sub processor
{
my $query;
while ($query = $queue->dequeue)
{
print "Thread " . threads->tid . " got $query\n";
}
}
my $builder=threads->create(\&buildQueue);
push @threads,threads->create(\&process) for 1..5;
#Waiting for our threads to finish.
$builder->join;
foreach(@threads)
{
$_->join;
}
Run Code Online (Sandbox Code Playgroud)