Ste*_*lis 14 queue scala stream fifo
我正在寻找Scala中的FIFO流,即提供功能的东西
流应该是可关闭的,并且应该阻止访问下一个元素,直到添加元素或关闭流.
实际上我有点惊讶的是,集合库没有(似乎)包含这样的数据结构,因为IMO是一个非常经典的数据结构.
我的问题:
1)我忽略了什么吗?是否已有提供此功能的课程?
2)好的,如果它没有包含在集合库中,那么它可能只是现有集合类的一个简单组合.但是,我试图找到这个简单的代码,但对于这样一个简单的问题,我的实现看起来仍然非常复杂.这样的FifoStream有更简单的解决方案吗?
class FifoStream[T] extends Closeable {
val queue = new Queue[Option[T]]
lazy val stream = nextStreamElem
private def nextStreamElem: Stream[T] = next() match {
case Some(elem) => Stream.cons(elem, nextStreamElem)
case None => Stream.empty
}
/** Returns next element in the queue (may wait for it to be inserted). */
private def next() = {
queue.synchronized {
if (queue.isEmpty) queue.wait()
queue.dequeue()
}
}
/** Adds new elements to this stream. */
def enqueue(elems: T*) {
queue.synchronized {
queue.enqueue(elems.map{Some(_)}: _*)
queue.notify()
}
}
/** Closes this stream. */
def close() {
queue.synchronized {
queue.enqueue(None)
queue.notify()
}
}
}
Run Code Online (Sandbox Code Playgroud)谢谢你的建议.我略微修改了paradigmatic的解决方案,以便toStream返回一个不可变的流(允许可重复读取),以便它符合我的需要.为了完整起见,这里是代码:
import collection.JavaConversions._
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] = new LinkedBlockingQueue[Option[A]]() ) {
lazy val toStream: Stream[A] = queue2stream
private def queue2stream: Stream[A] = queue take match {
case Some(a) => Stream cons ( a, queue2stream )
case None => Stream empty
}
def close() = queue add None
def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}
Run Code Online (Sandbox Code Playgroud)
par*_*tic 15
在Scala中,流是"功能迭代器".人们期望它们是纯粹的(没有副作用)和不可改变的.在这种情况下,每次迭代流时都会修改队列(所以它不是纯粹的).这可能会产生很多误解,因为迭代两次相同的流,会产生两种不同的结果.
话虽这么说,你应该使用Java BlockingQueues,而不是滚动你自己的实现.它们被认为在安全性和性能方面得到了很好的实施.这是我能想到的最干净的代码(使用你的方法):
import java.util.concurrent.BlockingQueue
import scala.collection.JavaConversions._
class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] ) {
def toStream: Stream[A] = queue take match {
case Some(a) => Stream cons ( a, toStream )
case None => Stream empty
}
def close() = queue add None
def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}
object FIFOStream {
def apply[A]() = new LinkedBlockingQueue
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5012 次 |
| 最近记录: |