Abh*_*kar 2 scala akka reactive-streams akka-stream
我试图在包装一些阻塞调用Future.The返回类型是Seq[User]其中User的一个case class.以下只是不会编译与各种重载版本存在的投诉.有什么建议?我试过几乎所有的变化都Source.apply没有运气.
// All I want is Seq[User] => Future[Seq[User]]
def findByFirstName(firstName: String) = {
val users: Seq[User] = userRepository.findByFirstName(firstName)
val sink = Sink.fold[User, User](null)((_, elem) => elem)
val src = Source(users) // doesn't compile
src.runWith(sink)
}
Run Code Online (Sandbox Code Playgroud)
首先,我假设您使用的是1.0版,akka-http-experimental因为API可能会从之前的版本更改.
您的代码无法编译的原因是akka.stream.scaladsl.Source$.apply()需要
scala.collection.immutable.Seq而不是scala.collection.mutable.Seq.
因此,您必须使用to[T]方法将可变序列转换为不可变序列.
文件:akka.stream.scaladsl.Source
此外,当您看到文档时,Source$.apply()接受,()=>Iterator[T]因此您也可以()=>users.iterator作为参数传递.
由于Sink.fold(...)返回最后一个求值表达式,您可以将空值Seq()作为第一个参数,迭代users并将元素附加到序列,最后得到结果.
但是,可能有一个更好的解决方案,可以创建一个Sink将每个评估表达式放入其中Seq,但我找不到它.
以下代码有效.
import akka.actor._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source,Sink}
import scala.concurrent.ExecutionContext.Implicits.global
case class User(name:String)
object Main extends App{
implicit val system = ActorSystem("MyActorSystem")
implicit val materializer = ActorMaterializer()
val users = Seq(User("alice"),User("bob"),User("charlie"))
val sink = Sink.fold[Seq[User], User](Seq())(
(seq, elem) =>
{println(s"elem => ${elem} \t| seq => ${seq}");seq:+elem})
val src = Source(users.to[scala.collection.immutable.Seq])
// val src = Source(()=>users.iterator) // this also works
val fut = src.runWith(sink) // Future[Seq[User]]
fut.onSuccess({
case x=>{
println(s"result => ${x}")
}
})
}
Run Code Online (Sandbox Code Playgroud)
上面代码的输出是
elem => User(alice) | seq => List()
elem => User(bob) | seq => List(User(alice))
elem => User(charlie) | seq => List(User(alice), User(bob))
result => List(User(alice), User(bob), User(charlie))
Run Code Online (Sandbox Code Playgroud)