如何在Future回调中保存和返回数据

Rec*_*r80 1 asynchronous scala akka concurrent.futures lagom

在过去的几天里,我一直面临着关于在Scala中保存和处理Futures数据的问题.我是两种语言和概念的新手.Lagom关于Cassandra的文档表示要实现大约9个代码文件,我希望确保我的数据库代码能够在分散这么多代码之前完成.

具体来说,我正在尝试实现一个概念验证,以便向/从cassandra数据库发送数据,这些数据库是为您实现的.到目前为止,我能够向/从数据库发送和检索数据,但是我无法返回该数据,因为这一切都是异步运行的,并且还返回数据成功返回.

我已经玩了一会儿; 检索代码如下所示:

override def getBucket(logicalBucket: String) = ServiceCall[NotUsed, String] {
request => {
  val returnList = ListBuffer[String]()

  println("Retrieving Bucket " + logicalBucket)
  val readingFromTable = "SELECT * FROM data_access_service_impl.s3buckets;"

  //DB query
  var rowsFuture: Future[Seq[Row]] = cassandraSession.selectAll(readingFromTable)
  println(rowsFuture)

  Await.result(rowsFuture, 10 seconds)

  rowsFuture onSuccess {
    case rows => {
      println(rows)
      for (row <- rows) println(row.getString("name"))
      for (row <- rows) returnList += row.getString("name")
      println("ReturnList: " + returnList.mkString)
    }
  }

  rowsFuture onFailure {
    case e => println("An error has occured: " + e.getMessage)
    Future {"An error has occured: " + e.getMessage}
  } 

  Future.successful("ReturnList: " + returnList.mkString)
 }      
}
Run Code Online (Sandbox Code Playgroud)

当它运行时,我在onSuccess回调中获得预期的数据库值为'println'.但是,我在return语句中使用的同一个变量在回调之外打印为空(并返回空数据).这也发生在我使用的'insert'函数中,它并不总是返回我在回调函数中设置的变量.

如果我尝试将语句放在回调函数中,我会收到错误'返回单位,期望Future [String]'.所以我陷入了无法从回调函数中返回的地方,因此我无法保证我将返回数据).

我的目标是将一个字符串返回给API,以便显示数据库中所有s3存储桶名称的列表.这意味着迭代Future [Seq [Row]]数据类型,并将数据保存为连接字符串.如果有人可以提供帮助,他们将解决我通过Lagom,Akka,Datastax和Cassandra文档阅读的2周问题.我在这一点上大吃一惊(信息过载),我没有找到关于此的明确指南.

作为参考,这是cassandraSession文档:

LagomTutorial/Documentation样式信息及其唯一的cassandra查询示例 CassandraSession.scala代码

Sea*_*ira 7

要了解有关事情的关键Future,(和OptionEitherTry)是你不(一般)获取值他们,你把计算他们.最常见的方法是使用mapflatMap方法.

在您的情况下,您想要Seq[Row]将其转换为a String.但是,你Seq[Row]被包含在这个不透明的数据结构中Future,所以你不能rows.mkString像你实际拥有的那样Seq[Row].因此,不是获取值并对其执行计算,而是将计算带入rows.mkString数据:

//DB query
val rowsFuture: Future[Seq[Row]] = cassandraSession.selectAll(readingFromTable)
val rowToString = (row: Row) => row.getString("name")
val computation = (rows: Seq[Row]) => rows.map(rowToString).mkString

// Computation to the data, rather than the other way around
val resultFuture = rowsFuture.map(computation)
Run Code Online (Sandbox Code Playgroud)

现在,当rowsFuture完成后,你通过调用创建新的未来rowsFuture.map将会实现与调用的结果computationSeq[Row]真正关心的.

那时你就可以了return resultFuture,一切都会按预期工作,因为调用的代码getBucket期望a Future并且会在适当的时候处理它.

为什么Future不透明?

原因很简单,因为它代表了当前可能不存在的值.您只能在值存在时获取值,但是当您开始通话时它不存在.isComplete代码不是让你自己轮询一些字段,而是让你注册计算(回调,比如onSuccessonFailure)或使用map和创建新的派生未来值flatMap.

更深层次的原因是因为FutureMonad和monads包含计算,但没有从中提取计算的操作