试图使用fork做一个看似简单的任务,但是失败了

Ste*_* P. 3 perl

所以,基本上我有一个非常大的数组,我需要从中读取数据.我希望能够并行完成这项工作; 然而,当我尝试时,我失败了.为简单起见,假设我有一个包含100个元素的数组.我的想法是将数组分成10个等分并尝试并行读取它们(10是任意的,但我不知道我可以同时运行多少个进程,10个看起来足够低).我需要根据每个分区的读数返回一个计算(新数据结构),但我不修改原始数组中的任何内容.

我没有尝试上述内容,而是尝试了更简单的方法,但我做错了,因为它无法以任何方式工作.所以,然后我尝试简单地使用子进程推送到一个数组.下面的代码Time::HiRes用于查看我可以使用分叉运行速度多快,而不是,但我还没有达到这一点(当我接近几百万个条目时,我将会测试它在我的数组中):

use strict;
use warnings;
use Time::HiRes;

print "Starting main program\n";

my %child;
my @array=();
my $counter=0;
my $start = Time::HiRes::time();

for (my $count = 1; $count <= 10; $count++) 
{
        my $pid = fork();

        if ($pid) 
        {
                $child{$pid}++;
        } 
        elsif ($pid == 0) 
        {
                addToArray(\$counter,\@array); 
                exit 0;
        } 
        else 
        {
                die "couldnt fork: $!\n";
        }
}

while (keys %child)
{
    my $pid = waitpid(-1,0);
    delete $child{$pid};
}

my $stop = Time::HiRes::time();
my $duration = $stop-$start;

print "Time spent: $duration\n";
print "Size of array: ".scalar(@array)."\n";
print "End of main program\n";

sub addToArray 
{
my $start=shift;
my $count=${$start};
${$start}+=10;
my $array=shift;

  for (my $i=$count; $i<$count +10; $i++)
    {
     push @{$array}, $i;
    }

  print scalar(@{$array})."\n";
}
Run Code Online (Sandbox Code Playgroud)

注意:我使用push代替${$array}[$i]=$i,因为我意识到我$counter实际上并没有更新,所以这对代码来说永远不会有效.

我认为这不起作用,因为孩子们都是原始程序的副本,我实际上从未在我的"原始程序"中向数组添加任何内容.在那个问题上,我非常困难.同样,我实际上试图解决的实际问题是如何对我的数组进行分区(包含数据)并尝试并行读取它们并根据我的读数返回计算(注意:我不打算修改原始数组),但如果我无法弄清楚如何让我$counter更新,我将永远无法做到这一点.我也想知道如何让上面的代码做我想做的事情,但这是次要的目标.

一旦我可以让我的计数器正确更新,是否有任何机会在更新之前启动另一个进程并且我实际上不会读取整个数组?如果是这样,我该如何解释?

请,任何帮助将不胜感激.我很沮丧/困难.我希望有一个简单的解决方法.提前致谢.

编辑:我试图使用Parallel :: ForkManager,但无济于事:

#!/usr/local/roadm/bin/perl
use strict;
use warnings;
use Time::HiRes;
use Parallel::ForkManager;

my $pm = Parallel::ForkManager->new(10);

for (my $count = 1; $count <= 10; $count++) 
{
    my $pid = $pm->start and next;   
    sub1(\$counter,\@array); 
    $pm->finish; # Terminates the child process       
}
  $pm->wait_all_children;  
Run Code Online (Sandbox Code Playgroud)

我没有包含其他无关的内容,请参阅上面的遗漏代码/ sub ...再次,帮助将非常感激.我对此非常陌生,需要有人握住我的手.我也尝试用run_on_start和做某事run_on_finish,但他们也没有用.

amo*_*mon 5

您的代码有两个问题:您的子进程没有共享数据,如果分叉进程共享数据,您将遇到竞争条件.解决方案是use threads.可以通过在父线程中对数据进行分区来消除竞争条件的任何可能性,当然,也可以通过不使用共享数据来消除.

主题

Perl中的线程与forking 类似:默认情况下,没有共享内存.这使得使用线程非常容易.但是,每个线程都运行它自己的perl解释器,这使线程非常昂贵.谨慎使用.

首先,我们必须通过激活线程支持use threads.要启动一个线程,我们这样做threads->create(\&code, @args),它返回一个线程对象.然后代码将在一个单独的线程中运行,并将使用给定的参数进行调用.线程完成执行后,我们可以通过调用收集返回值$thread->join.注意:线程代码的上下文由create方法确定,而不是由join.

我们可以用:shared属性标记变量.您$counter@array将来都是这样的例子,但通常更好地传递明确的数据副本而不是使用共享状态(免责声明:从理论角度来看).为了避免使用共享数据的竞争条件,您实际上必须$counter使用信号量来保护您,但同样,不需要共享状态.

这是一个玩具程序,展示了如何使用线程来并行化计算:

use strict;
use warnings;
use threads;
use 5.010; # for `say`, and sane threads
use Test::More;

# This program calculates differences between elements of an array

my @threads;
my @array = (1, 4, 3, 5, 5, 10, 7, 8);
my @delta = ( 3, -1, 2, 0, 5, -3, 1 );

my $number_of_threads = 3;
my @partitions = partition( $#array, $number_of_threads );
say "partitions: @partitions";

for (my $lower_bound = 0; @partitions; $lower_bound += shift @partitions) {
  my $upper_bound = $lower_bound + $partitions[0];
  say "spawning thread with [@array[$lower_bound .. $upper_bound]]";
  # pass copies of the values in the array slice to new thread:
  push @threads, threads->create(\&differences, @array[$lower_bound .. $upper_bound]);
  # note that threads->create was called in list context
}

my @received;
push @received, $_->join for @threads; # will block until all are finished

is_deeply \@received, \@delta;
done_testing;

# calculates the differences. This doesn't need shared memory.
# note that @array could have been safely accessed, as it is never written to
# If I had written to a (unshared) variable, these changes would have been thread-local
sub differences {
  say "Hi from a worker thread, I have ", 0+@_, " elements to work on";
  return map $_[$_] - $_[$_-1], 1 .. $#_;
  # or more readable:
  # my @d;
  # for my $i (1 .. $#_) {
  #   push @d, $_[$i] - $_[$i-1];
  # }
  # return @d;
}

# divide workload into somewhat fair parts, giving earlier threads more work
sub partition {
  my ($total, $parts) = @_;
  my $base_size = int($total / $parts);
  my @partitions = ($base_size) x $parts;
  $partitions[$_-1]++ for 1 .. $total - $base_size*$parts;
  return @partitions;
}
Run Code Online (Sandbox Code Playgroud)

关于线程数的说明:这应该取决于系统的处理器数量.如果你有四个核心,那么超过四个线程没有多大意义.