了解供应点块(按需供应)

cod*_*ons 12 concurrency message-passing rakudo cro raku

我无法理解supply {\xe2\x80\xa6}区块/它们创建的按需供应的用途。

\n

Live supplies (that is, the types that come from a Supplier and get new values whenever that Supplier emits a value) make sense to me \xe2\x80\x93 they\'re a version of asynchronous streams that I can use to broadcast a message from one or more senders to one or more receivers. It\'s easy to see use cases for responding to a live stream of messages: I might want to take an action every time I get a UI event from a GUI interface, or every time a chat application broadcasts that it has received a new message.

\n

But on-demand supplies don\'t make a similar amount of sense. The docs say that

\n
\n

An on-demand broadcast is like Netflix: everyone who starts streaming a movie (taps a supply), always starts it from the beginning (gets all the values), regardless of how many people are watching it right now.

\n
\n

Ok, fair enough. But why/when would I want those semantics?

\n

The examples also leave me scratching my head a bit. The Concurancy page currently provides three examples of a supply block, but two of them just emit the values from a for loop. The third is a bit more detailed:

\n
my $bread-supplier = Supplier.new;\nmy $vegetable-supplier = Supplier.new;\n \nmy $supply = supply {\n    whenever $bread-supplier.Supply {\n        emit("We\'ve got bread: " ~ $_);\n    };\n    whenever $vegetable-supplier.Supply {\n        emit("We\'ve got a vegetable: " ~ $_);\n    };\n}\n$supply.tap( -> $v { say "$v" });\n \n$vegetable-supplier.emit("Radish");   # OUTPUT: \xc2\xabWe\'ve got a vegetable: Radish\xe2\x90\xa4\xc2\xbb \n$bread-supplier.emit("Thick sliced"); # OUTPUT: \xc2\xabWe\'ve got bread: Thick sliced\xe2\x90\xa4\xc2\xbb \n$vegetable-supplier.emit("Lettuce");  # OUTPUT: \xc2\xabWe\'ve got a vegetable: Lettuce\xe2\x90\xa4\xc2\xbb \n
Run Code Online (Sandbox Code Playgroud)\n

There, the supply block is doing something. Specifically, it\'s reacting to the input of two different (live) Suppliers and then merging them into a single Supply. That does seem fairly useful.

\n

\xe2\x80\xa6 except that if I want to transform the output of two Suppliers and merge their output into a single combined stream, I can just use

\n
my $supply = Supply.merge: \n                 $bread-supplier.Supply.map(    { "We\'ve got bread: $_" }),\n                 $vegetable-supplier.Supply.map({ "We\'ve got a vegetable: $_" });\n
Run Code Online (Sandbox Code Playgroud)\n

And, indeed, if I replace the supply block in that example with the map/merge above, I get exactly the same output. Further, neither the supply block version nor the map/merge version produce any output if the tap is moved below the calls to .emit, which shows that the "on-demand" aspect of supply blocks doesn\'t really come into play here.

\n

At a more general level, I don\'t believe the Raku (or Cro) docs provide any examples of a supply block that isn\'t either in some way transforming the output of a live Supply or emitting values based on a for loop or Supply.interval. None of those seem like especially compelling use cases, other than as a different way to transform Supplys.

\n

Given all of the above, I\'m tempted to mostly write off the supply block as a construct that isn\'t all that useful, other than as a possible alternate syntax for certain Supply combinators. However, I have it on fairly good authority that

\n
\n

while Supplier is often reached for, many times one would be better off writing a supply block that emits the values.

\n
\n

Given that, I\'m willing to hazard a pretty confident guess that I\'m missing something about supply blocks. I\'d appreciate any insight into what that might be.

\n

Jon*_*ton 10

既然你提到了Supply.merge,我们就从那开始吧。想象一下,它不在 Raku 标准库中,而我们必须实现它。为了达到正确的实施,我们必须注意什么?至少:

  1. 产生一个Supply结果,当点击时,将...
  2. 点击(即订阅)所有输入供应。
  3. 当其中一个输入提供emit一个值时,emit它会提供给我们的点击器......
  4. ...但请确保我们遵循串行供应规则,即我们emit一次只能发送一条消息;我们的两个输入源可能会emit同时从不同的线程获取值,因此这不是一个自动属性。
  5. 当我们所有的供应品都发送了他们的done事件后,done也发送该事件。
  6. 如果我们点击的任何输入电源发送了quit事件,则中继该事件,并关闭所有其他输入电源的点击。
  7. 确保我们没有任何奇怪的竞争会导致破坏供应语法emit* [done|quit]
  8. 当我们生成的结果上的抽头Supply关闭时,请务必关闭我们抽头的所有(仍处于活动状态)输入电源上的抽头。

祝你好运!

那么标准库是如何做到的呢?像这样:

method merge(*@s) {
    @s.unshift(self) if self.DEFINITE;  # add if instance method
    # [I elided optimizations for when there are 0 or 1 things to merge]
    supply {
        for @s {
            whenever $_ -> \value { emit(value) }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

块的目的supply是大大简化在一个或多个对象上正确实现可重用Supply操作的过程。它旨在消除的主要风险是:

  • 在我们点击多个消息的情况下,无法正确处理同时到达的消息Supply,可能会导致我们损坏状态(因为我们可能希望编写的许多供应组合器也将具有状态;merge非常简单,不这样做)。块supply向我们保证一次只会处理一条消息,从而消除了这种危险。
  • 失去订阅跟踪,从而泄漏资源,这将成为任何长时间运行的程序中的一个问题。

第二个很容易被忽视,尤其是在使用像 Raku 这样的垃圾收集语言时。事实上,如果我开始迭代一些Seq,然后在到达末尾之前停止这样做,迭代器将变得无法访问,并且 GC 会在一段时间后吃掉它。如果我正在迭代文件的行并且那里有一个隐式文件句柄,那么我将面临文件无法及时关闭的风险,并且如果我不幸的话,可能会用完句柄,但至少有一些路径可以到达它关闭并释放资源。

反应式编程则不然:引用从生产者指向消费者,因此如果消费者“停止关心”但尚未关闭水龙头,那么生产者将保留其对消费者的引用(从而导致内存泄漏)并继续发送它发送消息(从而完成一次性工作)。这最终可能会导致应用程序崩溃。链接的 Cro 聊天示例就是一个示例:

my $chat = Supplier.new;

get -> 'chat' {
    web-socket -> $incoming {
        supply {
            whenever $incoming -> $message {
                $chat.emit(await $message.body-text);
            }
            whenever $chat -> $text {
                emit $text;
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

当 WebSocket 客户端断开连接时会发生什么?Supply我们使用块返回的Tapsupply被关闭,导致close传入 WebSocket 消息的 Tap 和$chat. 如果没有这个,订阅者列表$chat Supplier将无限制地增长,并且反过来也为每个先前的连接保持一定大小的对象图。

因此,即使在这种情况下,直播Supply是非常直接参与的,我们也经常会随着时间的推移而订阅它。按需供应主要是资源的获取和释放;有时,该资源将是对实时Supply.

一个公平的问题是我们是否可以在没有块的情况下编写这个示例supply。是的,我们可以;这可能有效:

my $chat = Supplier.new;

get -> 'chat' {
    web-socket -> $incoming {
        my $emit-and-discard = $incoming.map(-> $message {
                $chat.emit(await $message.body-text);
                Supply.from-list()
            }).flat;
        Supply.merge($chat, $emit-and-discard)
    }
}
Run Code Online (Sandbox Code Playgroud)

注意到它在Supply-space 中做出了一些努力,映射到了空无一物。我个人发现可读性较差 - 这甚至没有避免块supply,它只是隐藏在merge. 更棘手的情况是,所利用的资源数量随着时间的推移而变化,例如在递归文件监视中,可能会出现要监视的新目录。我真的不知道如何用标准库中出现的组合器来表达这一点。

我花了一些时间教授反应式编程(不是使用 Raku,而是使用 .Net)。对于一个异步流来说事情很容易,但是当我们开始处理具有多个异步流的情况时事情变得更加困难。有些东西自然适合组合符,例如“合并”或“zip”或“组合最新”。只要有足够的创造力,其他人就可以被打造成这样的形状——但对我来说,它常常感觉扭曲而不是富有表现力。当问题无法用组合器表达时会发生什么?用 Raku 术语来说,就是创建输出Supplier、利用输入电源、编写将输入的内容发送到输出的逻辑,等等。每次都必须处理订阅管理、错误传播、完成传播和并发控制——而且很容易搞砸。

当然,supply区块的存在也并不能阻止Raku走上脆弱的道路。这就是我说的意思:

虽然经常会联系供应商,但很多时候最好编写一个发出值的供应块

我在这里并没有考虑发布/订阅的情况,我们确实想要广播值并且处于反应链的入口点。我正在考虑这样的情况:我们点击一​​个或多个Supply,获取值,做一些事情,然后将emit事情放入另一个中Supplier。这是我将此类代码迁移到块的示例supply这是稍后在同一代码库中出现的另一个示例。希望这些例子能澄清我的想法。