Jam*_*abe 5 functional-programming scala producer-consumer typeclass
如果系统中的某些实体可以充当数据或事件的生成者,而其他实体可以充当消费者,那么将这些"正交关注点"外化到生产者和消费者类型类中是否有意义?
我可以看到Haskell管道库使用这种方法,并且欣赏这个问题对于来自Haskell背景的人来说可能看起来非常基本,但是会对Scala视角和示例感兴趣,因为我看不到很多.
您应该看看Matt Might 的这篇文章。
\n\nProducer
它为您提供了、Consumer
、 (您提到的 haskell 库中的 Pipe)的简单实现Transducer
,以及如何使用它们创建 Web 服务器的示例。
基本上每个Producer
扩展Runnable
都有一个私有缓冲区来输出元素。缓冲区是一个ArrayBlockingQueue
线程安全的java。
每个Consumer
也是一个Runnable
并且具有使用类似架构的输入缓冲器。
当您将 a 链接Consumer
到 a时Producer
,您就创建了另一个Runnable
。\n启动时,它将启动Producer
和Consumer
(它们是可运行的)并在它们之间传输数据。
当您将 a 链接Transducer
到 a时Producer
,它会创建一个新的Producer
.
因此,如果您遵循他的实现,您应该能够以 haskell 的方式编写:
\n\nlisten ==> connect ==> process ==> reply\n
Run Code Online (Sandbox Code Playgroud)\n\n以下是从上面的链接复制和改进的一些代码:
\n\nimport java.util.concurrent.ArrayBlockingQueue\n\ntrait Coroutine extends Runnable {\n def start() {\n val myThread = new Thread(this)\n myThread.start()\n }\n}\n\ntrait Producer[O] extends Coroutine {\n private val outputs = new ArrayBlockingQueue[O](1024)\n protected def put(output: O): Unit = outputs.put(output)\n def next(): O = outputs.take()\n\n def ==>[I >: O](consumer: Consumer[I]): Coroutine = {\n val that = this\n new Coroutine {\n def run() {\n while (true) { val o = that.next(); consumer.accept(o) }\n }\n\n override def start() {\n that.start()\n consumer.start()\n super.start()\n }\n }\n }\n}\n\ntrait Consumer[I] extends Coroutine {\n private val inputs = new ArrayBlockingQueue[I] (1024)\n def accept(input : I): Unit = inputs.put(input)\n protected def get(): I = inputs.take()\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n以下是如何使用它:
\n\ncase class IntProducer(zero: Int) extends Producer[Int]{\n def run(): Unit = {\n var i = zero\n while(true) { put(i); i += 1\xc2\xa0}\n }\n}\n\nobject Printer extends Consumer[Any]{\n def run(): Unit = {\n while(true) { println(get()) }\n }\n}\n\nval pip = IntProducer(0) ==> Printer\npip.start()\n
Run Code Online (Sandbox Code Playgroud)\n\n要查看更多示例以及如何处理 `Transducer\xcc\x80,请查看我的 Gist。
\n