goe*_*itz 6 java iterator asynchronous scala java-stream
我必须使用hasNext()和next()方法实现Iterator接口(由Java API定义),该接口应返回源自异步处理的HTTP响应(使用Akka actor处理)的结果元素.
必须满足以下要求:
我还没有研究过Java 8流或Akka流.但由于我基本上必须遍历队列(有限流),我怀疑还有任何合适的解决方案.
目前,我的Scala实现存根使用java.util.concurrent.BlockingQueue,如下所示:
class ResultStreamIterator extends Iterator[Result] {
val resultQueue = new ArrayBlockingQueue[Option[Result]](100)
def hasNext(): Boolean = ??? // return true if not done yet
def next(): Result = ??? // take() next element if not done yet
case class Result(value: Any) // sent by result producing actor
case object Done // sent by result producing actor when finished
class ResultCollector extends Actor {
def receive = {
case Result(value) => resultQueue.put(Some(value))
case Done => resultQueue.put(None)
}
}
}
Run Code Online (Sandbox Code Playgroud)
我使用Option [Result]来指示结果流的结尾为None.我已经尝试过窥视下一个元素并使用"完成"标志,但我希望有一个更简单的解决方案.
奖金问题:
我听从了jiro的建议,根据需要做了一些调整。总的来说,我喜欢将消息发送给参与者getNext()并next()实现的方法。ask这确保了任何时候只有一个线程修改队列。
但是,我不确定此实现的性能,因为ask和Await.result将为每次调用hasNext()和创建两个线程next()。
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
import akka.actor.{ActorRef, ActorSystem, Props, Stash}
import akka.pattern.ask
import akka.util.Timeout
case object HasNext
case object GetNext
case class Result(value: Any)
case object Done
class ResultCollector extends Actor with Stash {
val queue = scala.collection.mutable.Queue.empty[Result]
def collecting: Actor.Receive = {
case HasNext => if (queue.isEmpty) stash else sender ! true
case GetNext => if (queue.isEmpty) stash else sender ! queue.dequeue
case value: Result => unstashAll; queue += value
case Done => unstashAll; context become serving
}
def serving: Actor.Receive = {
case HasNext => sender ! queue.nonEmpty
case GetNext => sender ! { if (queue.nonEmpty) queue.dequeue else new NoSuchElementException }
}
def receive = collecting
}
class ResultStreamIteration(resultCollector: ActorRef) extends Iterator {
implicit val timeout: Timeout = Timeout(30 seconds)
override def hasNext(): Boolean = Await.result(resultCollector ? HasNext, Duration.Inf) match {
case b: Boolean => b
}
override def next(): Any = Await.result(resultCollector ? GetNext, Duration.Inf) match {
case Result(value: Any) => value
case e: Throwable => throw e
}
}
object Test extends App {
implicit val exec = scala.concurrent.ExecutionContext.global
val system = ActorSystem.create("Test")
val actorRef = system.actorOf(Props[ResultCollector])
Future {
for (i <- 1 to 10000) actorRef ! Result(s"Result $i"); actorRef ! Done
}
val iterator = new ResultStreamIteration(actorRef)
while (iterator.hasNext()) println(iterator.next)
system.shutdown()
}
Run Code Online (Sandbox Code Playgroud)