虽然昨天纠正了我的SRP方式的错误,但我仍然想知道如何干净地保证对akka中的异步资源的单线程访问,例如文件句柄.显然,我不希望允许从不同的线程调度多个读写操作,但如果我的actor在该文件上调用了基于未来的API,那么可能会发生这种情况.
我想出的最好的模式是这样的:
trait AsyncIO {
def Read(offset: Int, count: Int) : Future[ByteBuffer] = ???
}
object GuardedIOActor {
case class Read(offset: Int, count: Int)
case class ReadResult(data: ByteBuffer)
private case class ReadCompleted()
}
class GuardedIOActor extends Actor with Stash with AsyncIO {
import GuardedIOActor._
var caller :Option[ActorRef] = None
def receive = {
case Read(offset,count) => caller match {
case None => {
caller = Some(sender)
Read(offset,count).onSuccess({
case data => {
self ! ReadCompleted()
caller.get ! ReadResult(data)
} …Run Code Online (Sandbox Code Playgroud) Akka / Scala Future背后的理念是,每当我们找到阻塞的代码段(例如IO调用,网络调用等)时,我们都必须将其包装在一个Future中,并在某个时间点之后异步获取结果。但是,之前阻塞主线程的阻塞代码块现在阻塞了Future支持的单独线程。那Akka / Scala Future买了什么。
val blockingCallResult: Result = block() //blocks the thread of execution.
now let's use Akka/Scala future and wrap the blocking call with Future
val future = Future[Result] {
val blockingCallResult: Result = block() //also blocks on some thread in thread pool
blockingCallResult
}
Run Code Online (Sandbox Code Playgroud)
我们如何通过利用未来而受益。
在我们的项目中,我们需要执行一个侦听队列并在循环中处理即将发生的消息的任务,该循环永远不会完成.代码看起来像:
def processQueue = {
while(true) {
val message = queue.next();
processMessage(message) match {
case Success(_) => ...
case _ => ...
}
}
}
Run Code Online (Sandbox Code Playgroud)
所以我们想在一个单独的线程中运行它.
我可以想象两种方法,一种是用Thread我们在Java中做的:
new Thread(new Runnable() { processQueue() }).start();
Run Code Online (Sandbox Code Playgroud)
另一种方式是使用Future(我们现在这样做):
Future { processQueue }
Run Code Online (Sandbox Code Playgroud)
我只是想知道Future在这种情况下使用是否正确,因为我知道(这可能是错误的),Future意味着要运行一些任务,将在未来的某个时间完成或返回结果.但我们的任务永远不会完成.
我也想知道scala中最好的解决方案是什么.
我正在实现Future共享计算的接口.即一个线程执行计算,其他线程需要相同的结果只需要通过Future.所以,我已经阅读了Object#wait()方法文档并确定它完全满足我的需求.以下是我对Future#get()方法的实现方式:
public class CustomFuture implements Future<Collection<Integer>> {
private AtomicBoolean started;
private Exception computationException;
private boolean cancelled;
private Collection<Integer> computationResult;
private Object waitForTheResult = new Object();
public Collection<Integer> get(){
if(started.compareAndSet(false, true))
//start the computation //notifyAll() is called after finishing the computation here.
while(computationResult == null){
if(cancelled)
throw new CancellationException();
if(computationException != null)
throw new ExectuonException();
synchronized(waitForTheResult){
waitForTheResult.wait();
}
}
return computationResult;
}
//The rest of methods
}
Run Code Online (Sandbox Code Playgroud)
由于依赖于低级原语,我不确定实现是否良好.我认为,根据经验,我们应该避免使用这种低级原语.也许这是合理的情况.
也许有一个更好的选择wait()在java.util.concurrent.
对于Seq 包含在a中的给定Future,例如
import scala.concurrent._
import ExecutionContext.Implicits.global
val xs = Future { Seq(1,2,3) }
Run Code Online (Sandbox Code Playgroud)
如何将集合中的第一个(仅一个)值提取到另一个中Future,即
Future { 1 }
Run Code Online (Sandbox Code Playgroud) 我有一个大的计算大致基于以下模式:
def f1(i:Int):Int = ???
def f2(i:Int):Int = ???
def processA(l: List[Int]) =
l.map(i => Future(f1(i)))
def processB(l: List[Int]) = {
val p = processA(l)
p.map(fut => fut.map(f2))
}
def main() = {
val items = List( /* 1k to 10k items here */ )
val results = processB(items)
results.map(_.onComplete ( ... ))
}
Run Code Online (Sandbox Code Playgroud)
如果我的理解是正确的,我遇到的问题是处理是广度优先的.ProcessA启动了数千个期货,然后processB将汇集数千个新的期货,这些期货将在processA完成后处理.onComplete回调将开始很晚才开始...
我想把这个深度优先:过程A的几个未来开始,然后,processB从那里继续而不是切换到队列中的其他东西.
可以在香草scala中完成吗?我应该转向一些替代Futures()和ThreadPools的lib吗?
编辑:更详细一点.f1 andThen f2正如答案中所建议的那样,重写目前是不切实际的.实际上,processA and B正在做一堆其他事情(包括副作用).而processB依赖的事实ProcessA是私人的.如果曝光,它会破坏SoC.
编辑2:我想我会放松一点"香草"约束.有人建议Akka流可以提供帮助.我目前正在看scalaz.Task:有意见吗?
说我有一个函数,它接受某种Option [] ...即:
def help(x: Int,
y : Option[BigInteger],
ec: ExecutionContext,
sc: SecurityContext): Future[Long] = { ... }
Run Code Online (Sandbox Code Playgroud)
我有一个用地图调用它的对象,比方说
val answerList: List[Future[Long]] = random.getPersons
.map(p => help(x , myY, ec, sc))
.collect(Collectors.toList())
Run Code Online (Sandbox Code Playgroud)
我说"myY"就是这么说的
类型不匹配,预期Option [BigInteger],实际:BigInteger.
当我的帮助方法选择类型时,我会看到它的来源.
我尝试通过选择[myY]来投射myY,但这似乎没有帮助.假设帮助方法正确实施,有人可以帮助我或指出正确的方向吗?谢谢!
我刚刚更新了Flutter,并成功从中下载了我的原始项目git。现在我收到一个奇怪的Future错误。我可以在github上在线看到它,但是没有关于如何修复的明确答案。该项目甚至没有加载。它从我的main.dart文件中读取Future语句并返回此...
[VERBOSE-2:dart_error.cc(16)]未处理的异常:类型'Future dynamic'不是类型'Future String'的子类型,其中
Future来自dart:async
Future来自dart:async
字符串来自dart:core
***不确定错误在哪里。我的飞镖分析说“仅等待期货”。这是我的小部件开始构建之前运行的期货代码...
Future<Null> getData() async{
FirebaseUser user = await FirebaseAuth.instance.currentUser;
user = FirebaseAuth.instance.currentUser;
var userid = user.uid;
fb.child('users/${userid}').onValue.listen((Event event) {
if (event.snapshot.value != null) {
name = event.snapshot.value['displayName'];
image = event.snapshot.value['image'];
} else {
name = "User";
}
});
}
Run Code Online (Sandbox Code Playgroud) 我有一个方法.此方法可能返回Future.failed(.....)或Future.successful(()).
def calculate(x:Int,y:Int):Future [Unit] = {........}
现在我需要测试这个方法.断言验证Future.successful(())案例的测试的最佳方法是什么 .
// First
import concurrent.Future
import concurrent.ExecutionContext.Implicits.global
for {
_ <- Future { Thread.sleep(3000); println("a") }
_ <- Future { Thread.sleep(2000); println("b") }
_ <- Future { Thread.sleep(1000); println("c") }
} {}?
// Second
?import concurrent.Future
import concurrent.ExecutionContext.Implicits.global
val future1 = Future { Thread.sleep(3000); println("a") }
val future2 = Future { Thread.sleep(2000); println("b") }
val future3 = Future { Thread.sleep(1000); println("c") }
for {
_ <- future1
_ <- future2
_ <- future3
} {}?
Run Code Online (Sandbox Code Playgroud) future ×10
scala ×8
akka ×2
asynchronous ×2
assert ×1
concurrency ×1
dart ×1
dynamic ×1
flutter ×1
io ×1
java ×1
option ×1
scalatest ×1
threadpool ×1