cod*_*ons 12 concurrency message-passing rakudo cro raku
我无法理解supply {\xe2\x80\xa6}区块/它们创建的按需供应的用途。
Live supplies (that is, the types that come from a Supplier and get new values whenever that Supplier emits a value) make sense to me \xe2\x80\x93 they\'re a version of asynchronous streams that I can use to broadcast a message from one or more senders to one or more receivers. It\'s easy to see use cases for responding to a live stream of messages: I might want to take an action every time I get a UI event from a GUI interface, or every time a chat application broadcasts that it has received a new message.
But on-demand supplies don\'t make a similar amount of sense. The docs say that
\n\n\nAn on-demand broadcast is like Netflix: everyone who starts streaming a movie (taps a supply), always starts it from the beginning (gets all the values), regardless of how many people are watching it right now.
\n
Ok, fair enough. But why/when would I want those semantics?
\nThe examples also leave me scratching my head a bit. The Concurancy page currently provides three examples of a supply block, but two of them just emit the values from a for loop. The third is a bit more detailed:
my $bread-supplier = Supplier.new;\nmy $vegetable-supplier = Supplier.new;\n \nmy $supply = supply {\n whenever $bread-supplier.Supply {\n emit("We\'ve got bread: " ~ $_);\n };\n whenever $vegetable-supplier.Supply {\n emit("We\'ve got a vegetable: " ~ $_);\n };\n}\n$supply.tap( -> $v { say "$v" });\n \n$vegetable-supplier.emit("Radish"); # OUTPUT: \xc2\xabWe\'ve got a vegetable: Radish\xe2\x90\xa4\xc2\xbb \n$bread-supplier.emit("Thick sliced"); # OUTPUT: \xc2\xabWe\'ve got bread: Thick sliced\xe2\x90\xa4\xc2\xbb \n$vegetable-supplier.emit("Lettuce"); # OUTPUT: \xc2\xabWe\'ve got a vegetable: Lettuce\xe2\x90\xa4\xc2\xbb \nRun Code Online (Sandbox Code Playgroud)\nThere, the supply block is doing something. Specifically, it\'s reacting to the input of two different (live) Suppliers and then merging them into a single Supply. That does seem fairly useful.
\xe2\x80\xa6 except that if I want to transform the output of two Suppliers and merge their output into a single combined stream, I can just use
my $supply = Supply.merge: \n $bread-supplier.Supply.map( { "We\'ve got bread: $_" }),\n $vegetable-supplier.Supply.map({ "We\'ve got a vegetable: $_" });\nRun Code Online (Sandbox Code Playgroud)\nAnd, indeed, if I replace the supply block in that example with the map/merge above, I get exactly the same output. Further, neither the supply block version nor the map/merge version produce any output if the tap is moved below the calls to .emit, which shows that the "on-demand" aspect of supply blocks doesn\'t really come into play here.
At a more general level, I don\'t believe the Raku (or Cro) docs provide any examples of a supply block that isn\'t either in some way transforming the output of a live Supply or emitting values based on a for loop or Supply.interval. None of those seem like especially compelling use cases, other than as a different way to transform Supplys.
Given all of the above, I\'m tempted to mostly write off the supply block as a construct that isn\'t all that useful, other than as a possible alternate syntax for certain Supply combinators. However, I have it on fairly good authority that
\n\nwhile Supplier is often reached for, many times one would be better off writing a supply block that emits the values.
\n
Given that, I\'m willing to hazard a pretty confident guess that I\'m missing something about supply blocks. I\'d appreciate any insight into what that might be.
Jon*_*ton 10
既然你提到了Supply.merge,我们就从那开始吧。想象一下,它不在 Raku 标准库中,而我们必须实现它。为了达到正确的实施,我们必须注意什么?至少:
Supply结果,当点击时,将...emit一个值时,emit它会提供给我们的点击器......emit一次只能发送一条消息;我们的两个输入源可能会emit同时从不同的线程获取值,因此这不是一个自动属性。done事件后,done也发送该事件。quit事件,则中继该事件,并关闭所有其他输入电源的点击。emit* [done|quit]。Supply关闭时,请务必关闭我们抽头的所有(仍处于活动状态)输入电源上的抽头。祝你好运!
那么标准库是如何做到的呢?像这样:
method merge(*@s) {
@s.unshift(self) if self.DEFINITE; # add if instance method
# [I elided optimizations for when there are 0 or 1 things to merge]
supply {
for @s {
whenever $_ -> \value { emit(value) }
}
}
}
Run Code Online (Sandbox Code Playgroud)
块的目的supply是大大简化在一个或多个对象上正确实现可重用Supply操作的过程。它旨在消除的主要风险是:
Supply,可能会导致我们损坏状态(因为我们可能希望编写的许多供应组合器也将具有状态;merge非常简单,不这样做)。块supply向我们保证一次只会处理一条消息,从而消除了这种危险。第二个很容易被忽视,尤其是在使用像 Raku 这样的垃圾收集语言时。事实上,如果我开始迭代一些Seq,然后在到达末尾之前停止这样做,迭代器将变得无法访问,并且 GC 会在一段时间后吃掉它。如果我正在迭代文件的行并且那里有一个隐式文件句柄,那么我将面临文件无法及时关闭的风险,并且如果我不幸的话,可能会用完句柄,但至少有一些路径可以到达它关闭并释放资源。
反应式编程则不然:引用从生产者指向消费者,因此如果消费者“停止关心”但尚未关闭水龙头,那么生产者将保留其对消费者的引用(从而导致内存泄漏)并继续发送它发送消息(从而完成一次性工作)。这最终可能会导致应用程序崩溃。链接的 Cro 聊天示例就是一个示例:
my $chat = Supplier.new;
get -> 'chat' {
web-socket -> $incoming {
supply {
whenever $incoming -> $message {
$chat.emit(await $message.body-text);
}
whenever $chat -> $text {
emit $text;
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
当 WebSocket 客户端断开连接时会发生什么?Supply我们使用块返回的Tapsupply被关闭,导致close传入 WebSocket 消息的 Tap 和$chat. 如果没有这个,订阅者列表$chat Supplier将无限制地增长,并且反过来也为每个先前的连接保持一定大小的对象图。
因此,即使在这种情况下,直播Supply是非常直接参与的,我们也经常会随着时间的推移而订阅它。按需供应主要是资源的获取和释放;有时,该资源将是对实时Supply.
一个公平的问题是我们是否可以在没有块的情况下编写这个示例supply。是的,我们可以;这可能有效:
my $chat = Supplier.new;
get -> 'chat' {
web-socket -> $incoming {
my $emit-and-discard = $incoming.map(-> $message {
$chat.emit(await $message.body-text);
Supply.from-list()
}).flat;
Supply.merge($chat, $emit-and-discard)
}
}
Run Code Online (Sandbox Code Playgroud)
注意到它在Supply-space 中做出了一些努力,映射到了空无一物。我个人发现可读性较差 - 这甚至没有避免块supply,它只是隐藏在merge. 更棘手的情况是,所利用的资源数量随着时间的推移而变化,例如在递归文件监视中,可能会出现要监视的新目录。我真的不知道如何用标准库中出现的组合器来表达这一点。
我花了一些时间教授反应式编程(不是使用 Raku,而是使用 .Net)。对于一个异步流来说事情很容易,但是当我们开始处理具有多个异步流的情况时事情变得更加困难。有些东西自然适合组合符,例如“合并”或“zip”或“组合最新”。只要有足够的创造力,其他人就可以被打造成这样的形状——但对我来说,它常常感觉扭曲而不是富有表现力。当问题无法用组合器表达时会发生什么?用 Raku 术语来说,就是创建输出Supplier、利用输入电源、编写将输入的内容发送到输出的逻辑,等等。每次都必须处理订阅管理、错误传播、完成传播和并发控制——而且很容易搞砸。
当然,supply区块的存在也并不能阻止Raku走上脆弱的道路。这就是我说的意思:
虽然经常会联系供应商,但很多时候最好编写一个发出值的供应块
我在这里并没有考虑发布/订阅的情况,我们确实想要广播值并且处于反应链的入口点。我正在考虑这样的情况:我们点击一个或多个Supply,获取值,做一些事情,然后将emit事情放入另一个中Supplier。这是我将此类代码迁移到块的示例supply;这是稍后在同一代码库中出现的另一个示例。希望这些例子能澄清我的想法。