创建基于时间的分块枚举

Car*_*ten 12 scala akka playframework playframework-2.0

我想创建一个Play 2 Enumeratee,它接收值并输出它们,每秒x/毫秒组合在一起.这样,在具有大量用户输入的多用户websocket环境中,可以限制每秒接收的帧数.

我知道可以将一定数量的项目组合在一起,如下所示:

val chunker = Enumeratee.grouped(
  Traversable.take[Array[Double]](5000) &>> Iteratee.consume()
)
Run Code Online (Sandbox Code Playgroud)

是否有基于时间而不是基于项目数量的内置方式来执行此操作?

我正在考虑以预定的Akka工作以某种方式做这件事,但初看起来这似乎效率低下,而且我不确定是否会出现问题.

bus*_*r84 3

像这样怎么样?我希望这对您有帮助。

 package controllers

 import play.api._
 import play.api.Play.current
 import play.api.mvc._
 import play.api.libs.iteratee._
 import play.api.libs.concurrent.Akka
 import play.api.libs.concurrent.Promise

 object Application extends Controller {

   def index = Action {
     val queue = new scala.collection.mutable.Queue[String]
     Akka.future {
       while( true ){
         Logger.info("hogehogehoge")
         queue += System.currentTimeMillis.toString
         Thread.sleep(100)
       }
     }
     val timeStream = Enumerator.fromCallback { () =>
       Promise.timeout(Some(queue), 200)
     }
     Ok.stream(timeStream.through(Enumeratee.map[scala.collection.mutable.Queue[String]]({ queue =>
       var str = ""
       while(queue.nonEmpty){
         str += queue.dequeue + ", "
       }
       str
     })))
   }

 }
Run Code Online (Sandbox Code Playgroud)

这份文档对您也有帮助。 http://www.playframework.com/documentation/2.0/Enumerators

更新 这是针对play2.1版本的。

 package controllers

 import play.api._
 import play.api.Play.current
 import play.api.mvc._
 import play.api.libs.iteratee._
 import play.api.libs.concurrent.Akka
 import play.api.libs.concurrent.Promise
 import scala.concurrent._
 import ExecutionContext.Implicits.global

 object Application extends Controller {

   def index = Action {
     val queue = new scala.collection.mutable.Queue[String]
     Akka.future {
       while( true ){
         Logger.info("hogehogehoge")
         queue += System.currentTimeMillis.toString
         Thread.sleep(100)
       }
     }
     val timeStream = Enumerator.repeatM{
       Promise.timeout(queue, 200)
     }
     Ok.stream(timeStream.through(Enumeratee.map[scala.collection.mutable.Queue[String]]({ queue =>
       var str = ""
       while(queue.nonEmpty){
         str += queue.dequeue + ", "
       }
       str
     })))
   }

 }
Run Code Online (Sandbox Code Playgroud)