Scala中是否有FIFO流?

Ste*_*lis 14 queue scala stream fifo

我正在寻找Scala中的FIFO流,即提供功能的东西

流应该是可关闭的,并且应该阻止访问下一个元素,直到添加元素或关闭流.

实际上我有点惊讶的是,集合库没有(似乎)包含这样的数据结构,因为IMO是一个非常经典的数据结构.

我的问题:

  • 1)我忽略了什么吗?是否已有提供此功能的课程?

  • 2)好的,如果它没有包含在集合库中,那么它可能只是现有集合类的一个简单组合.但是,我试图找到这个简单的代码,但对于这样一个简单的问题,我的实现看起来仍然非常复杂.这样的FifoStream有更简单的解决方案吗?

    class FifoStream[T] extends Closeable {
    
    val queue = new Queue[Option[T]]
    
    lazy val stream = nextStreamElem
    
    private def nextStreamElem: Stream[T] = next() match {
        case Some(elem) => Stream.cons(elem, nextStreamElem)
        case None       => Stream.empty
    }
    
    /** Returns next element in the queue (may wait for it to be inserted). */
    private def next() = {
        queue.synchronized {
            if (queue.isEmpty) queue.wait()
            queue.dequeue()
        }
    }
    
    /** Adds new elements to this stream. */
    def enqueue(elems: T*) {
        queue.synchronized {
            queue.enqueue(elems.map{Some(_)}: _*)
            queue.notify()
        }
    }
    
    /** Closes this stream. */
    def close() {
        queue.synchronized {
            queue.enqueue(None)
            queue.notify()
        }
    }
    }
    
    Run Code Online (Sandbox Code Playgroud)

范式的解决方案(明显修改)

谢谢你的建议.我略微修改了paradigmatic的解决方案,以便toStream返回一个不可变的流(允许可重复读取),以便它符合我的需要.为了完整起见,这里是代码:

import collection.JavaConversions._
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] = new LinkedBlockingQueue[Option[A]]() ) {
  lazy val toStream: Stream[A] = queue2stream
  private def queue2stream: Stream[A] = queue take match {
    case Some(a) => Stream cons ( a, queue2stream )
    case None    => Stream empty
  }
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}
Run Code Online (Sandbox Code Playgroud)

par*_*tic 15

在Scala中,流是"功能迭代器".人们期望它们是纯粹的(没有副作用)和不可改变的.在这种情况下,每次迭代流时都会修改队列(所以它不是纯粹的).这可能会产生很多误解,因为迭代两次相同的流,会产生两种不同的结果.

话虽这么说,你应该使用Java BlockingQueues,而不是滚动你自己的实现.它们被认为在安全性和性能方面得到了很好的实施.这是我能想到的最干净的代码(使用你的方法):

import java.util.concurrent.BlockingQueue
import scala.collection.JavaConversions._

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] ) {
  def toStream: Stream[A] = queue take match {
    case Some(a) => Stream cons ( a, toStream )
    case None => Stream empty
  }
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}

object FIFOStream {
  def apply[A]() = new LinkedBlockingQueue
}
Run Code Online (Sandbox Code Playgroud)

  • 这看起来很有用!但它应该是“def apply[A]() = new FIFOStream[A](new LinkedBlockingQueue)”吗? (2认同)