共享电源可以同时运行多个分接块吗?

bri*_*foy 13 tap perl6 raku

考虑这个代码需要一段时间才能完成.所有块同时运行(立即输出)然后休眠.大部分都没有完成,因为程序结束的时间越早,他们就会:

my $supply = Supply.interval(0.2);
my $tap = $supply.tap: { say "1 $^a"; sleep 5;  };
sleep 5;
Run Code Online (Sandbox Code Playgroud)

输出(省略)有25行(每个刻度为1,每5秒为0.2):

1. 0
1. 1
...
1. 24
Run Code Online (Sandbox Code Playgroud)

然后我将供应改为.share:

my $supply = Supply.interval(0.2).share;
my $tap = $supply.tap: { say "1. $^a"; sleep 5 };
sleep 5;
Run Code Online (Sandbox Code Playgroud)

我只看到一行输入,但我期望相同的输出:

1. 1
Run Code Online (Sandbox Code Playgroud)

.share使得多个分接头可以获得相同的值.

my $supply = Supply.interval(0.2).share;
my $tap  = $supply.tap: { say "1. $^a"; sleep 5 };
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;
Run Code Online (Sandbox Code Playgroud)

输出仍然仅为第一个抽头输出,但仍然只有一行.我预计每行25行:

1. 1
Run Code Online (Sandbox Code Playgroud)

Jon*_*ton 13

基本规则Supply是:

  1. 如果没有明确要求,就不会引入并发性
  2. 通过发送者支付模型的背压
  3. 消息在下一个消息之前被完整处理(因此.map({ ...something with state... })可以信任不会导致状态冲突)

规则3并不真正适用,share因为在该点之后存在单独的下游操作链,但规则1和规则2.目的share是允许发布/订阅,并且还提供多个下游消息处理器的一大块处理的重用.引入并行消息处理是另外一个问题.

有各种选择.一种是将并行处理的消息插入到Channel.这明确地引入了一个缓冲消息的位置(好吧,直到你的内存耗尽......这就是为什么Supply发送者支付背压模型).将一个Channel后退强制转换为一个Supply从池线程中获取并从中Channel释放的值Supply.那样看起来像:

my $supply = Supply.interval(0.2).share;
my $tap  = $supply.Channel.Supply.tap: { say "1. $^a"; sleep 5 };
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;
Run Code Online (Sandbox Code Playgroud)

请注意,由于whenever自动胁迫它要求对一个反应的东西Supply,那会看起来像whenever $supply.Channel { },这使得它一个非常简短的解决方案-但在同一时间在很好地明确表示如何正常背压机制是旁边跨步.该解决方案的另一个属性是它保留了消息的顺序,并且仍然在下游提供一次一个处理Channel.

另一种方法是通过启动一些异步工作来处理它来对每条消息做出反应.start对a 的操作Supply调度传递的块以在线程池上为每个接收的消息运行,从而不阻止下一个消息的到达.其结果是SupplySupply.这迫使人们点击每个内部Supply以实际发生任何事情,这在一开始似乎有点反直觉,但实际上是为了程序员的利益:它清楚地表明还有一些额外的异步工作需要跟踪.我非常强烈建议将它与react/ wheneversyntax 结合使用,它自动执行订阅管理和错误传播.问题中代码的最直接转换是:

my $supply = Supply.interval(0.2).share;
my $tap  = supply { whenever $supply.start({ say "1. $^a"; sleep 5 }) { whenever $_ {} } }.tap;
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;
Run Code Online (Sandbox Code Playgroud)

虽然也可以将其写为:

my $supply = Supply.interval(0.2).share;
my $tap  = supply { whenever $supply -> $a { whenever start { say "1. $a"; sleep 5 } {} } }.tap;
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;
Run Code Online (Sandbox Code Playgroud)

这指出了编写parallelize Supply组合子的可能性:

my $supply = Supply.interval(0.2).share;
my $tap  = parallelize($supply, { say "1. $^a"; sleep 5 }).tap;
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;

sub parallelize(Supply $messages, &operation) {
    supply {
        whenever $messages -> $value {
            whenever start operation($value) {
                emit $_;
            }
        }
     }
}
Run Code Online (Sandbox Code Playgroud)

这种方法的输出与一种方法的输出完全不同Channel,因为一旦消息进入,操作就会全部启动.它也不会保留消息顺序.还有一个隐式队列(与使用该Channel方法的显式队列不同),它只是现在它是线程池调度程序的工作队列和OS调度程序,它必须跟踪正在进行的工作.而且,没有背压,但请注意,完全有可能通过跟踪未完成Promises并阻止进一步传入的消息来实现await Promise.anyof(@outstanding).

最后,我将注意到有一些考虑hyper wheneverrace whenever构造来提供一些语言级机制来处理Supply消息的并行处理.然而,这种语义,以及它们如何发挥supply-block设计目标和安全属性,代表了重大的设计挑战.

  • 这将成为Supply文档的一个很好的补充. (3认同)

Eli*_*sen 6

a的抽头Supply在单个线程内顺序运行.因此,第二次点击的代码将仅在第一次点击(睡眠5秒)后运行.这显示在以下代码中:

my $supply = Supply.interval(0.2).share;
my $tap  = $supply.tap: { say "1. $^a in #{+$*THREAD}" };
my $tap2 = $supply.tap: { say "2. $^a in #{+$*THREAD}" };
sleep 0.5;
===================
1. 1 in #4
2. 1 in #4
1. 2 in #4
2. 2 in #4
Run Code Online (Sandbox Code Playgroud)

所以目前的答案是:不