使用类型类建模生产者 - 消费者语义?

Jam*_*abe 5 functional-programming scala producer-consumer typeclass

如果系统中的某些实体可以充当数据或事件的生成者,而其他实体可以充当消费者,那么将这些"正交关注点"外化到生产者和消费者类型类中是否有意义?

我可以看到Haskell管道库使用这种方法,并且欣赏这个问题对于来自Haskell背景的人来说可能看起来非常基本,但是会对Scala视角和示例感兴趣,因为我看不到很多.

gwe*_*zek 2

您应该看看Matt Might 的这篇文章

\n\n

Producer它为您提供了、Consumer、 (您提到的 haskell 库中的 Pipe)的简单实现Transducer,以及如何使用它们创建 Web 服务器的示例。

\n\n

基本上每个Producer扩展Runnable都有一个私有缓冲区来输出元素。缓冲区是一个ArrayBlockingQueue线程安全的java。

\n\n

每个Consumer也是一个Runnable并且具有使用类似架构的输入缓冲器。

\n\n

当您将 a 链接Consumer到 a时Producer,您就创建了另一个Runnable。\n启动时,它将启动ProducerConsumer(它们是可运行的)并在它们之间传输数据。

\n\n

当您将 a 链接Transducer到 a时Producer,它会创建一个新的Producer.

\n\n

因此,如果您遵循他的实现,您应该能够以 haskell 的方式编写:

\n\n
listen ==> connect ==> process ==> reply\n
Run Code Online (Sandbox Code Playgroud)\n\n

以下是从上面的链接复制和改进的一些代码:

\n\n
import java.util.concurrent.ArrayBlockingQueue\n\ntrait Coroutine extends Runnable {\n    def start() {\n        val myThread = new Thread(this)\n        myThread.start()\n    }\n}\n\ntrait Producer[O] extends Coroutine {\n     private val outputs = new ArrayBlockingQueue[O](1024)\n     protected def put(output: O): Unit = outputs.put(output)\n     def next(): O = outputs.take()\n\n     def ==>[I >: O](consumer: Consumer[I]): Coroutine = {\n         val that = this\n         new Coroutine {\n             def run() {\n                 while (true) { val o = that.next(); consumer.accept(o) }\n             }\n\n             override def start() {\n                 that.start()\n                 consumer.start()\n                 super.start()\n             }\n         }\n     }\n}\n\ntrait Consumer[I] extends Coroutine {\n    private val inputs = new ArrayBlockingQueue[I] (1024)\n    def accept(input : I): Unit = inputs.put(input)\n    protected def get(): I = inputs.take()\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

以下是如何使用它:

\n\n
case class IntProducer(zero: Int) extends Producer[Int]{\n    def run(): Unit = {\n         var i = zero\n         while(true) { put(i); i += 1\xc2\xa0}\n    }\n}\n\nobject Printer extends Consumer[Any]{\n    def run(): Unit = {\n         while(true) { println(get()) }\n    }\n}\n\nval pip = IntProducer(0) ==> Printer\npip.start()\n
Run Code Online (Sandbox Code Playgroud)\n\n

要查看更多示例以及如何处理 `Transducer\xcc\x80,请查看我的 Gist

\n