Perl Mojolicious:使用 all_settled 限制并发

h q*_*h q 4 perl promise mojolicious async-await

Mojolicious for Perl中,有没有办法限制concurrency使用时Promise->all_settled

在下面的例子中,我想限制concurrency=>10. 我用来sleep模拟阻塞操作:

use Mojolicious::Lite -signatures, -async_await;

helper isOdd_p => sub($self, $number)
{
    return Mojo::Promise->new(sub($resolve, $reject ) {
        Mojo::IOLoop->subprocess(
            sub {
                sleep 1;
                $number % 2;
            },
            sub ($subprocess, $err, @res ) {
                $reject->( $err ) if $err;
                $reject->( @res ) if @res && $res[0]==0; # reject Even
                $resolve->( @res );
            })
            
        });
};

any '/' => async sub ($c) {
    $c->render_later();
    my @promises = map { $c->isOdd_p($_) } (0..50);
    my @results = eval { await Mojo::Promise->all_settled(@promises) };
    #my @results = eval { await Mojo::Promise->map({concurrency=>10}, sub { $c->isOdd_p($_)}, (0..50) ) };
    if (my $err = $@) {
        $c->render(json => $@); # will this line be ever reached with all_settled??
    } else {
        $c->render(json => [ @results ] );
    }
};

app->start;
Run Code Online (Sandbox Code Playgroud)

ike*_*ami 5

当前,您启动所有 51 个任务,然后等待所有 51 个任务完成。

将并发限制为 10 意味着启动 10 个任务,等待一个任务完成,启动一个任务,等待一个任务完成,等等。

三个微小的改变就map足够了。(更改了两次提及allto all_settled,并更改了拒绝处理程序以继续进行。还将其从 M::P 方法更改为 sub 方法。)

sub map_all_settled {
  my ($class, $options, $cb, @items) = ('Mojo::Promise', ref $_[0] eq 'HASH' ? shift : {}, @_);
 
  return $class->all_settled(map { $_->$cb } @items) if !$options->{concurrency} || @items <= $options->{concurrency};
 
  my @start = map { $_->$cb } splice @items, 0, $options->{concurrency};
  my @wait  = map { $start[0]->clone } 0 .. $#items;
 
  my $start_next = sub {
    return () unless my $item = shift @items;
    my ($start_next, $chain) = (__SUB__, shift @wait);
    $_->$cb->then(sub { $chain->resolve(@_); $start_next->() }, sub { $chain->reject(@_); $start_next->() }) for $item;
    return ();
  };
 
  $_->then($start_next, sub { }) for @start;
 
  return $class->all_settled(@start, @wait);
}
Run Code Online (Sandbox Code Playgroud)
my @results = await map_all_settled({concurrency=>10}, sub { $c->isOdd_p($_)}, (0..50) );
Run Code Online (Sandbox Code Playgroud)