在同一react块中使用不同的线程调度程序时会发生什么?

Håk*_*and 11 asynchronous perl6 raku

这是一个后续问题,何时signal()以反应块顺序依赖?

以下使用默认调度程序的代码$*SCHEDULER使用户可以在以下事件循环中按CTRL-C来立即退出:

use v6;
my %scheduler;
my $use-default-scheduler = True;
if $use-default-scheduler {
    %scheduler = scheduler => $*SCHEDULER;
}    
react {
    whenever signal(SIGINT, |%scheduler) {
        say "Got signal";
        exit;
    }
    whenever Supply.from-list($*IN.lines, |%scheduler) {
        say "Got line";
        exit if $++ == 1 ;
    }
}
Run Code Online (Sandbox Code Playgroud)

我对在同一react循环中使用两个不同的线程调度程序会发生什么感兴趣?如果我通过使用以上代码中的设置Supply.from-list()来使用默认的线程调度程序,而不是。现在,用户无法通过按立即退出该块。如果他按了该程序,则仅挂起该程序,直到按Enter。$*SCHEDULER$use-default-scheduler = FalsereactCTRL-CCTRL-C

那么,这里实际发生了什么?是否react一次只关注一个事件循环?(我在这里想象两个事件循环,一个用于默认调度程序,第一个whenever用于SIGINT信号,另一个用于$*IN.lines电源)。因此,react现在将重点放在from-list()调度程序上,$*IN.lines但是在此事件循环中以某种方式忽略了SIGINT吗?那么按CTRL-C不改变react块的状态吗?

Jon*_*ton 11

要查看实际发生的情况,让我们将程序重写为(按顺序)进行react操作。我将忽略许多细节,这些细节对手头的问题并不太重要。

为了使事情更紧凑,我将重写所提供程序的以下部分:

react {
    whenever signal(SIGINT, |%scheduler) {
        say "Got signal";
        exit;
    }
    whenever Supply.from-list($*IN.lines, |%scheduler) {
        say "Got line";
        exit if $++ == 1 ;
    }
}
Run Code Online (Sandbox Code Playgroud)

首先,a react { ... }实际上就像await supply { ... }-就是敲击supply { }块并await使其结尾。

await supply {
    whenever signal(SIGINT, |%scheduler) {
        say "Got signal";
        exit;
    }
    whenever Supply.from-list($*IN.lines, |%scheduler) {
        say "Got line";
        exit if $++ == 1 ;
    }
}
Run Code Online (Sandbox Code Playgroud)

但是什么是supply街区?在本质上,supply(等等react)提供:

  • 并发控制,因此它将一次处理一条消息(因此我们需要某种锁;它Lock::Async用于此)
  • 完成和错误传播(在明显的作弊中,我们将使用a Promise来实现这一点,因为我们只想要该react部分;真实的东西会产生结果Supply,我们可以将其emit评估)
  • 当没有未完成的订阅时,系统会自动完成(我们将跟踪中的订阅SetHash

因此,我们可以将程序重写为如下形式:

await do {
    # Concurency control
    my $lock = Lock::Async.new;
    # Completion/error conveyance
    my $done = Promise.new;
    # What's active?
    my %active is SetHash;

    # An implementation a bit like that behind the `whenever` keyword, but with
    # plenty of things that don't matter for this question missing.
    sub whenever-impl(Supply $s, &block) {
        # Tap the Supply
        my $tap;
        $s.tap:
            # When it gets tapped, add the tap to our active set.
            tap => {
                $tap = $_; 
                %active{$_} = True;
            },
            # Run the handler for any events
            { $lock.protect: { block($_) } },
            # When this one is done, remove it from the %active list; if it's
            # the last thing, we're done overall.
            done => {
                $lock.protect: {
                    %active{$tap}:delete;
                    $done.keep() unless %active;
                }
            },
            # If there's an async error, close all taps and pass it along.
            quit => {
                $lock.protect: -> $err {
                    .close for %active.keys;
                    $done.quit($err);
                }
            }
    }

    # We hold the lock while doing initial setup, so you can rely on having
    # done all initialization before processing a first message.
    $lock.protect: {
        whenever-impl signal(SIGINT, |%scheduler), {
            say "Got signal";
            exit;
        }
        whenever-impl Supply.from-list($*IN.lines, |%scheduler), {
            say "Got line";
            exit if $++ == 1 ;
        }
    }

    $done
}
Run Code Online (Sandbox Code Playgroud)

注意,这里的调度程序或事件循环什么都没有。一个supplyreact不关心消息的来源,它只关心通过实施的自身完整性Lock::Async。还要注意,它也没有引入任何并发:它实际上只是一个并发控制结构。

通常,在您使用数据源的supply同时react使用数据源tap并立即获得控制权。然后,我们继续设置其他whenever块,退出设置阶段,并且该锁可用于我们收到的任何消息。这种行为是您通常遇到的所有耗材都会得到的。就是这种情况signal(...)。当您给出Supply.from-list(...)一个明确的调度程序时,也是如此$*SCHEDULER;在这种情况下,它将调度从$*IN池中读取的循环,并立即交出控制权。

当我们遇到的东西,并在问题出现那样做的。如果我们挖掘Supply.from-list($*IN.lines),则默认为从做读取$*IN当前线程上产生的价值emit,因为Supply.from-list使用CurrentThreadScheduler它的默认值。那怎么办?只需运行要求立即运行的代码即可!

但是,这给我们带来了另一个谜。给定Lock::Async不是可重入的,那么如果我们:

  1. 获取锁进行设置
  2. 电话tap上的Supply.from-list(...),其同步并尝试运行到emit一个值
  3. 尝试获取锁,以便我们可以处理该值

然后,我们将陷入僵局,因为我们试图获取一个已经由我们持有的不可重入的锁。的确,如果您在此处运行该程序的desugar,这正是发生的情况:它挂起了。但是,原始代码不会挂起;它的表现有点尴尬。是什么赋予了?

真正的实现所要做的一件事是在设置阶段检测这种情况。然后继续执行,并在设置阶段完成后继续执行。这意味着我们可以做以下事情:

my $primes = supply {
    .emit for ^Inf .grep(*.is-prime);
}
react {
    whenever $primes { .say }
    whenever Promise.in(3) { done }
}
Run Code Online (Sandbox Code Playgroud)

并解决。我不会在这里重现这种乐趣,但是应该足够狡猾地使用gather/来实现take