Mar*_*coS 10 parallel-processing perl fetch
我需要从许多Web数据提供程序中获取一些数据,这些数据提供程序不公开任何服务,因此我必须编写类似这样的内容,例如使用WWW :: Mechanize:
use WWW::Mechanize;
@urls = ('http://www.first.data.provider.com', 'http://www.second.data.provider.com', 'http://www.third.data.provider.com');
%results = {};
foreach my $url (@urls) {
$mech = WWW::Mechanize->new();
$mech->get($url);
$mech->form_number(1);
$mech->set_fields('user' => 'myuser', pass => 'mypass');
$resp = $mech->submit();
$results{$url} = parse($resp->content());
}
consume(%results);
Run Code Online (Sandbox Code Playgroud)
是否有一些(可能是简单的:-)方式将数据同时提取到一个公共的%结果变量,即:并行地从所有提供者那里获取?
jro*_*way 25
threads在Perl中应避免使用. use threads主要用于在Windows上模拟UNIX样式的fork; 除此之外,它毫无意义.
(如果你关心,实现使这个事实非常明确.在perl中,解释器是一个PerlInterpreter对象.threads
工作方式是通过制作一堆线程,然后在每个线程中创建一个全新的PerlInterpreter对象.线程绝对没有共享,甚至比子进程更少; fork让你复制写入,但是threads,所有复制都在Perl空间完成!慢!)
如果你想在同一个过程中同时做很多事情,那么在Perl中这样做的方法是使用事件循环,比如
EV,
Event或
POE,或者使用Coro.(您也可以根据AnyEventAPI 编写代码,这将允许您使用任何事件循环.这是我更喜欢的.)两者之间的区别在于您编写代码的方式.
AnyEvent(以及EV,Event,POE等)强制您以面向回调的方式编写代码.控制是一种持续传递的方式,而不是从顶部到底部的控制流动.函数不返回值,它们使用其结果调用其他函数.这允许您并行运行许多IO操作 - 当给定的IO操作产生结果时,将调用处理这些结果的函数.当另一个IO操作完成时,将调用该函数.等等.
这种方法的缺点是你必须重写你的代码.所以有一个叫做CoroPerl真实(用户空间)线程的模块,它允许你从上到下编写你的代码,但仍然是非阻塞的.(这样做的缺点在于它大大改变了Perl的内部结构.但它看起来效果很好.)
因此,由于我们 今晚不想重写 WWW :: Mechanize,我们将使用Coro.Coro带有一个名为Coro :: LWP的模块 ,它可以使对LWP的所有调用都是非阻塞的.它将阻止当前线程(Coro lingo中的"coroutine"),但它不会阻止任何其他线程.这意味着您可以同时发出大量请求,并在结果可用时对其进行处理.Coro比您的网络连接更好地扩展; 每个协同程序只使用几k内存,因此很容易拥有数万个内存.
考虑到这一点,让我们看一些代码.这是一个并行启动三个HTTP请求的程序,并打印每个响应的长度.它与您正在做的相似,减去实际处理; 但是你可以把你的代码放在我们计算长度的地方,它会起作用.
我们将从通常的Perl脚本样板开始:
#!/usr/bin/env perl
use strict;
use warnings;
Run Code Online (Sandbox Code Playgroud)
然后我们将加载Coro特定的模块:
use Coro;
use Coro::LWP;
use EV;
Run Code Online (Sandbox Code Playgroud)
Coro在幕后使用了一个事件循环; 如果你愿意,它会为你选择一个,但我们只是明确指定EV.这是最好的事件循环.
然后我们将加载我们工作所需的模块,这只是:
use WWW::Mechanize;
Run Code Online (Sandbox Code Playgroud)
现在我们准备编写我们的程序了.首先,我们需要一个URL列表:
my @urls = (
'http://www.google.com/',
'http://www.jrock.us/',
'http://stackoverflow.com/',
);
Run Code Online (Sandbox Code Playgroud)
然后我们需要一个函数来生成一个线程并完成我们的工作.为了让科罗上一个新的线程,你叫async喜欢async { body; of the
thread; goes here }.这将创建一个线程,启动它,并继续执行该程序的其余部分.
sub start_thread($) {
my $url = shift;
return async {
say "Starting $url";
my $mech = WWW::Mechanize->new;
$mech->get($url);
printf "Done with $url, %d bytes\n", length $mech->content;
};
}
Run Code Online (Sandbox Code Playgroud)
所以这是我们计划的核心.我们只是把我们正常的LWP程序放在异步中,它将神奇地非阻塞. get块,但其他协同程序将在等待它从网络获取数据时运行.
现在我们只需要启动线程:
start_thread $_ for @urls;
Run Code Online (Sandbox Code Playgroud)
最后,我们想要开始处理事件:
EV::loop;
Run Code Online (Sandbox Code Playgroud)
就是这样.当你运行它时,你会看到一些输出,如:
Starting http://www.google.com/
Starting http://www.jrock.us/
Starting http://stackoverflow.com/
Done with http://www.jrock.us/, 5456 bytes
Done with http://www.google.com/, 9802 bytes
Done with http://stackoverflow.com/, 194555 bytes
Run Code Online (Sandbox Code Playgroud)
如您所见,请求是并行进行的,您不必诉诸threads!
更新
您在原始帖子中提到要限制并行运行的HTTP请求数.一种方法是使用信号量, Coro中的Coro :: Semaphore.
信号量就像一个计数器.当您想要使用信号量保护的资源时,您可以"降低"信号量.这会减少计数器并继续运行程序.但是如果当你试图降低信号量时计数器为零,你的线程/协同程序将进入睡眠状态,直到它为非零.当计数再次上升时,您的线程将在信号量下唤醒并继续.最后,当您使用信号量保护的资源时,您"提升"信号量并为其他线程提供运行的机会.
这使您可以控制对共享资源的访问,例如"发出HTTP请求".
您需要做的就是创建HTTP请求线程将共享的信号量:
my $sem = Coro::Semaphore->new(5);
Run Code Online (Sandbox Code Playgroud)
5意味着"让我们在阻止之前'向下'调用'5次",换句话说,"让5个并发的HTTP请求".
在我们添加任何代码之前,让我们谈谈可能出现的问题.可能发生的一件坏事是一个线程"向下" - 信号量,但从来没有"向上" - 当它完成时它.然后没有什么可以使用该资源,你的程序可能最终什么也不做.有很多方法可以实现.如果你写了一些类似的代码$sem->down; do something; $sem->up,你可能会觉得安全,但是如果"做某事"会引发异常呢?那么信号量就会被遗忘,这很糟糕.
幸运的是,Perl可以很容易地拥有范围Guard对象,当持有对象的变量超出范围时,它将自动运行代码.我们可以制作代码$sem->up,然后在我们不打算时,我们永远不必担心持有资源.
Coro :: Semaphore集成了守卫的概念,这意味着您可以说my $guard = $sem->guard,当控制从您呼叫的范围流出时,它将自动向下移动信号量并向上移动guard.
考虑到这一点,我们所要做的就是限制并行请求的数量是guard我们使用HTTP的协同程序顶部的信号量:
async {
say "Waiting for semaphore";
my $guard = $sem->guard;
say "Starting";
...;
return result;
}
Run Code Online (Sandbox Code Playgroud)
处理意见:
如果您不希望您的程序永远存在,那么有一些选择.一种是在另一个线程中运行事件循环,然后join在每个工作线程上运行.这使您可以将结果从线程传递到主程序:
async { EV::loop };
# start all threads
my @running = map { start_thread $_ } @urls;
# wait for each one to return
my @results = map { $_->join } @running;
for my $result (@results) {
say $result->[0], ': ', $result->[1];
}
Run Code Online (Sandbox Code Playgroud)
您的线程可以返回如下结果:
sub start_thread($) {
return async {
...;
return [$url, length $mech->content];
}
}
Run Code Online (Sandbox Code Playgroud)
这是在数据结构中收集所有结果的一种方法.如果您不想返回内容,请记住所有协同程序共享状态.所以你可以把:
my %results;
Run Code Online (Sandbox Code Playgroud)
在程序的顶部,并让每个协程更新结果:
async {
...;
$results{$url} = 'whatever';
};
Run Code Online (Sandbox Code Playgroud)
当所有协同程序都运行完毕后,您的哈希将填充结果.但是,你必须让join每个协程知道答案何时准备就绪.
最后,如果您作为Web服务的一部分执行此操作,则应使用Corona等可识别协同的Web服务器.这将在协程中运行每个HTTP请求,除了能够并行发送 HTTP请求之外,还允许您并行处理多个HTTP请求.这将很好地利用内存,CPU和网络资源,并且很容易维护!
(你基本上可以将我们的程序从上面切换到处理HTTP请求的协同程序;可以在协程中创建新的协同join程序.)