小编goe*_*itz的帖子

在异步生成的元素流上迭代hasNext()和next()

我必须使用hasNext()和next()方法实现Iterator接口(由Java API定义),该接口应返回源自异步处理的HTTP响应(使用Akka actor处理)的结果元素.

必须满足以下要求:

  • 不要阻塞并等待异步操作完成,因为生成大型结果集可能需要一段时间(迭代器应该在结果元素可用时立即返回)
  • Iterator.next()应该阻塞,直到下一个元素可用(如果没有更多元素可以抛出异常)
  • Iterator.hasNext()应该返回true,只要有更多的元素要来(即使下一个元素还没有)
  • 结果总数事先未知.生成actor的结果将在完成时发送特定的"结束消息".
  • 尽量避免使用InterruptedException,例如当迭代器在空队列上等待但不会生成更多元素时.

我还没有研究过Java 8流或Akka流.但由于我基本上必须遍历队列(有限流),我怀疑还有任何合适的解决方案.

目前,我的Scala实现存根使用java.util.concurrent.BlockingQueue,如下所示:

class ResultStreamIterator extends Iterator[Result] {
    val resultQueue = new ArrayBlockingQueue[Option[Result]](100)

    def hasNext(): Boolean = ???  // return true if not done yet
    def next(): Result = ???      // take() next element if not done yet

    case class Result(value: Any) // sent by result producing actor
    case object Done              // sent by result producing actor when finished

    class ResultCollector extends Actor {
        def receive = {
           case Result(value) => …
Run Code Online (Sandbox Code Playgroud)

java iterator asynchronous scala java-stream

6
推荐指数
1
解决办法
1076
查看次数

标签 统计

asynchronous ×1

iterator ×1

java ×1

java-stream ×1

scala ×1