我的主要目标是为大量对象做一些(外部时间昂贵的)工作.为此,如果我直接进行,则需要花费很多时间.所以我决定进入并行模式并分叉一些(4-8,让我们看看)子进程,每个子进程都为较小的一组对象完成工作.在主(父)进程中,我想打印出与单进程版本相同的整体统计信息.
但是,当我分叉4个子进程并在其中进行一些工作时,我可以看到它们还活着,但只有其中一个实际上正在做某事并将信息发送给父进程.
这是我到目前为止所做的代码 - 耗时的部分是用随机的usleep来模拟的,它很好地模拟了它的行为.
#!/usr/bin/env perl
use strict;
use warnings;
use DateTime;
use DateTime::Format::HTTP;
use Time::HiRes;
my @to_be_processed = (1..300000);
my @queues;
my $nprocs = 4;
my $parent_from_child;
my @child_from_parent;
my @child_to_parent;
$SIG{CHLD} = 'IGNORE';
$|=1; # autoflush
my %stat = (
total => scalar(@to_be_processed),
processed => 0,
time_started => [Time::HiRes::gettimeofday],
);
# divide the list into queues for each subprocess
for (my $i = 0; $i < $stat{total}; $i++ ) {
my $queue = $i % $nprocs;
push @{$queues[$queue]}, $to_be_processed[$i];
}
# for progress simulation
srand (time ^ $$);
for (my $proc = 0; $proc < $nprocs; $proc++) {
# set up the pipes
pipe $parent_from_child, $child_to_parent[$proc] or die "pipe failed - $!";
# fork
defined(my $pid = fork) or die "fork failed - $!";
if ($pid) {
# parent
close $child_to_parent[$proc];
printf("[%u] parent says: child %u created with pid %u\n", $$, $proc, $pid);
}
else {
# child
close $parent_from_child;
open(STDOUT, ">&=" . fileno($child_to_parent[$proc])) or die "open failed - $!";
warn(sprintf("[%u] child alive with %u entries\n", $$, scalar(@{$queues[$proc]})));
foreach my $id (@{$queues[$proc]}) {
printf("START: %s\n", $id);
# simulation of progress
my $random_microseconds = int(rand(3000000))+200000;
warn(sprintf("[%u] child 'works' for %u microseconds", $$, $random_microseconds));
Time::HiRes::usleep( $random_microseconds );
printf("DONE\n")
}
exit(0);
}
}
# parent: receive data from children and print overall statistics
while (<$parent_from_child>) {
chomp(my $line = $_);
if ($line =~ m/^START: (\S+)/) {
my ($id) = @_;
printf("%6u/%6u", $stat{processed}, $stat{total});
if ($stat{time_avg}) {
my $remaining = ($stat{total} - $stat{processed}) * $stat{time_avg};
my $eta = DateTime->from_epoch( epoch => time + $remaining );
$eta->set_time_zone('Europe/Berlin');
printf(" (ETA %s)", DateTime::Format::HTTP->format_isoz($eta));
}
printf("\r");
}
elsif ($line =~ /^DONE/) {
$stat{processed}++;
$stat{time_processed} = Time::HiRes::tv_interval( $stat{time_started} );
$stat{time_avg} = $stat{time_processed} / $stat{processed};
}
else {
printf("%s\n", $line);
}
}
Run Code Online (Sandbox Code Playgroud)
通常应该消除警告.如果你运行它,你应该看到只有一个孩子工作.我的问题是:为什么?我的错误在哪里?我怎样才能让所有人都做这个工作?
谢谢K.
你可以在strace下运行perl,你会发现你孩子的生命相当短,看起来像这样:
close(3) = 0
ioctl(4, SNDCTL_TMR_TIMEBASE or TCGETS, 0x7fff753b3a10) = -1 EINVAL (Invalid argument)
lseek(4, 0, SEEK_CUR) = -1 ESPIPE (Illegal seek)
fstat(4, {st_mode=S_IFIFO|0600, st_size=0, ...}) = 0
dup2(4, 1) = 1
dup(4) = 3
fcntl(4, F_GETFD) = 0x1 (flags FD_CLOEXEC)
dup2(3, 4) = 4
fcntl(4, F_SETFD, FD_CLOEXEC) = 0
close(3) = 0
fcntl(1, F_SETFD, 0) = 0
write(2, "[30629] child alive with 75000 e"..., 39) = 39
brk(0x3582000) = 0x3582000
write(1, "START: 1\n", 9) = -1 EPIPE (Broken pipe)
--- SIGPIPE (Broken pipe) @ 0 (0) ---
Run Code Online (Sandbox Code Playgroud)
这就是为什么:
pipe $parent_from_child, $child_to_parent[$proc] or die "pipe failed - $!";
Run Code Online (Sandbox Code Playgroud)
你已经在错误的参数上使用了数组来管道.您需要在父级中保持所有读取边打开.相反,您已经设置了一个数组,因此父级可以保持所有写侧面打开(但是在父块中,您可以立即关闭写入侧).所以下次通过你的循环,pipe创建一个新的句柄,分配给它$parent_from_child.旧的值因此没有更多的引用,并且perl清理它 - 意思是,它关闭文件句柄.所以你的孩子除了最后一个死于SIGPIPE.
我认为你的印象是你可以重新使用那个读句柄并只为它分配多个写句柄.你不能.pipe总是创建一个新的读句柄和一个新的写句柄.
如果你真的想要共享相同的读句柄(你可能不这样做,这将导致两个客户端的输出交错时出现损坏),只需在循环外创建一次.所有的孩子都将继承相同的写句柄fork.更可能的是,你想要每个孩子一个,你将不得不使用一个select循环来查看哪些输出可用,并阅读这些.
或者,我确信CPAN有一个现成的解决方案(或十个)供您使用.