Rec*_*r80 1 asynchronous scala akka concurrent.futures lagom
在过去的几天里,我一直面临着关于在Scala中保存和处理Futures数据的问题.我是两种语言和概念的新手.Lagom关于Cassandra的文档表示要实现大约9个代码文件,我希望确保我的数据库代码能够在分散这么多代码之前完成.
具体来说,我正在尝试实现一个概念验证,以便向/从cassandra数据库发送数据,这些数据库是为您实现的.到目前为止,我能够向/从数据库发送和检索数据,但是我无法返回该数据,因为这一切都是异步运行的,并且还返回数据成功返回.
我已经玩了一会儿; 检索代码如下所示:
override def getBucket(logicalBucket: String) = ServiceCall[NotUsed, String] {
request => {
val returnList = ListBuffer[String]()
println("Retrieving Bucket " + logicalBucket)
val readingFromTable = "SELECT * FROM data_access_service_impl.s3buckets;"
//DB query
var rowsFuture: Future[Seq[Row]] = cassandraSession.selectAll(readingFromTable)
println(rowsFuture)
Await.result(rowsFuture, 10 seconds)
rowsFuture onSuccess {
case rows => {
println(rows)
for (row <- rows) println(row.getString("name"))
for (row <- rows) returnList += row.getString("name")
println("ReturnList: " + returnList.mkString)
}
}
rowsFuture onFailure {
case e => println("An error has occured: " + e.getMessage)
Future {"An error has occured: " + e.getMessage}
}
Future.successful("ReturnList: " + returnList.mkString)
}
}
Run Code Online (Sandbox Code Playgroud)
当它运行时,我在onSuccess回调中获得预期的数据库值为'println'.但是,我在return语句中使用的同一个变量在回调之外打印为空(并返回空数据).这也发生在我使用的'insert'函数中,它并不总是返回我在回调函数中设置的变量.
如果我尝试将语句放在回调函数中,我会收到错误'返回单位,期望Future [String]'.所以我陷入了无法从回调函数中返回的地方,因此我无法保证我将返回数据).
我的目标是将一个字符串返回给API,以便显示数据库中所有s3存储桶名称的列表.这意味着迭代Future [Seq [Row]]数据类型,并将数据保存为连接字符串.如果有人可以提供帮助,他们将解决我通过Lagom,Akka,Datastax和Cassandra文档阅读的2周问题.我在这一点上大吃一惊(信息过载),我没有找到关于此的明确指南.
作为参考,这是cassandraSession文档:
LagomTutorial/Documentation样式信息及其唯一的cassandra查询示例 CassandraSession.scala代码
要了解有关事情的关键Future,(和Option和Either和Try)是你不(一般)获取值了他们,你把计算到他们.最常见的方法是使用map和flatMap方法.
在您的情况下,您想要Seq[Row]将其转换为a String.但是,你Seq[Row]被包含在这个不透明的数据结构中Future,所以你不能rows.mkString像你实际拥有的那样Seq[Row].因此,不是获取值并对其执行计算,而是将计算带入rows.mkString数据:
//DB query
val rowsFuture: Future[Seq[Row]] = cassandraSession.selectAll(readingFromTable)
val rowToString = (row: Row) => row.getString("name")
val computation = (rows: Seq[Row]) => rows.map(rowToString).mkString
// Computation to the data, rather than the other way around
val resultFuture = rowsFuture.map(computation)
Run Code Online (Sandbox Code Playgroud)
现在,当rowsFuture完成后,你通过调用创建新的未来rowsFuture.map将会实现与调用的结果computation对Seq[Row]你真正关心的.
那时你就可以了return resultFuture,一切都会按预期工作,因为调用的代码getBucket期望a Future并且会在适当的时候处理它.
Future不透明?原因很简单,因为它代表了当前可能不存在的值.您只能在值存在时获取值,但是当您开始通话时它不存在.isComplete代码不是让你自己轮询一些字段,而是让你注册计算(回调,比如onSuccess和onFailure)或使用map和创建新的派生未来值flatMap.
更深层次的原因是因为FutureMonad和monads包含计算,但没有从中提取计算的操作
| 归档时间: |
|
| 查看次数: |
967 次 |
| 最近记录: |