标签: future

Akka:守护异步资源

虽然昨天纠正了我的SRP方式的错误,但我仍然想知道如何干净地保证对akka中的异步资源的单线程访问,例如文件句柄.显然,我不希望允许从不同的线程调度多个读写操作,但如果我的actor在该文件上调用了基于未来的API,那么可能会发生这种情况.

我想出的最好的模式是这样的:

trait AsyncIO {
  def Read(offset: Int, count: Int) : Future[ByteBuffer] = ???
}

object GuardedIOActor {
  case class Read(offset: Int, count: Int)
  case class ReadResult(data: ByteBuffer)
  private case class ReadCompleted()
}

class GuardedIOActor extends Actor with Stash with AsyncIO {
  import GuardedIOActor._
  var caller :Option[ActorRef] = None

  def receive = {
    case Read(offset,count) => caller match {
      case None => {
        caller = Some(sender)
        Read(offset,count).onSuccess({
          case data => {
            self ! ReadCompleted()
            caller.get ! ReadResult(data)
          } …
Run Code Online (Sandbox Code Playgroud)

io asynchronous scala future akka

0
推荐指数
1
解决办法
229
查看次数

封装在Akka Future中的代码也会阻塞Future支持的线程,那么Future在这种情况下有什么帮助

Akka / Scala Future背后的理念是,每当我们找到阻塞的代码段(例如IO调用,网络调用等)时,我们都必须将其包装在一个Future中,并在某个时间点之后异步获取结果。但是,之前阻塞主线程的阻塞代码块现在阻塞了Future支持的单独线程。那Akka / Scala Future买了什么。

val blockingCallResult: Result = block() //blocks the thread of execution.

now let's use Akka/Scala future and wrap the blocking call with Future


val future = Future[Result] {

   val blockingCallResult: Result = block() //also blocks on some thread in thread pool

   blockingCallResult

}
Run Code Online (Sandbox Code Playgroud)

我们如何通过利用未来而受益。

scala future akka

0
推荐指数
1
解决办法
824
查看次数

使用`Future`运行一些永不完成的循环任务是否正确?

在我们的项目中,我们需要执行一个侦听队列并在循环中处理即将发生的消息的任务,该循环永远不会完成.代码看起来像:

def processQueue = {
  while(true) {
    val message = queue.next();
    processMessage(message) match {
      case Success(_) => ...
      case _ => ...
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

所以我们想在一个单独的线程中运行它.

我可以想象两种方法,一种是用Thread我们在Java中做的:

new Thread(new Runnable() { processQueue() }).start();
Run Code Online (Sandbox Code Playgroud)

另一种方式是使用Future(我们现在这样做):

Future { processQueue }
Run Code Online (Sandbox Code Playgroud)

我只是想知道Future在这种情况下使用是否正确,因为我知道(这可能是错误的),Future意味着要运行一些任务,将在未来的某个时间完成或返回结果.但我们的任务永远不会完成.

我也想知道scala中最好的解决方案是什么.

scala future

0
推荐指数
1
解决办法
154
查看次数

是否正在使用低等原语如等待被认为是坏的?

我正在实现Future共享计算的接口.即一个线程执行计算,其他线程需要相同的结果只需要通过Future.所以,我已经阅读了Object#wait()方法文档并确定它完全满足我的需求.以下是我对Future#get()方法的实现方式:

public class CustomFuture implements Future<Collection<Integer>> {

    private AtomicBoolean started;
    private Exception computationException;
    private boolean cancelled;
    private Collection<Integer> computationResult;
    private Object waitForTheResult = new Object();

    public Collection<Integer> get(){
        if(started.compareAndSet(false, true))
            //start the computation    //notifyAll() is called after finishing the computation here.
        while(computationResult == null){
            if(cancelled)
                throw new CancellationException();
            if(computationException != null)
                throw new ExectuonException();
            synchronized(waitForTheResult){
                waitForTheResult.wait();
            }
        }
        return computationResult;
    }             
    //The rest of methods
}
Run Code Online (Sandbox Code Playgroud)

由于依赖于低级原语,我不确定实现是否良好.我认为,根据经验,我们应该避免使用这种低级原语.也许这是合理的情况.

也许有一个更好的选择wait()java.util.concurrent.

java multithreading future

0
推荐指数
1
解决办法
154
查看次数

从未来[Seq [T]]到未来[T]的Scala提取值

对于Seq 包含在a中的给定Future,例如

import scala.concurrent._
import ExecutionContext.Implicits.global

val xs = Future { Seq(1,2,3) }
Run Code Online (Sandbox Code Playgroud)

如何将集合中的第一个(仅一个)值提取到另一个中Future,即

Future { 1 }
Run Code Online (Sandbox Code Playgroud)

scala future

0
推荐指数
1
解决办法
181
查看次数

scala未来的处理深度 - 首先不是广度优先

我有一个大的计算大致基于以下模式:

def f1(i:Int):Int = ???
def f2(i:Int):Int = ???

def processA(l: List[Int]) = 
  l.map(i => Future(f1(i)))

def processB(l: List[Int]) = {
  val p = processA(l)
  p.map(fut => fut.map(f2))
}

def main() = {
  val items = List( /* 1k to 10k items here */ )
  val results = processB(items)
  results.map(_.onComplete ( ... ))
}
Run Code Online (Sandbox Code Playgroud)

如果我的理解是正确的,我遇到的问题是处理是广度优先的.ProcessA启动了数千个期货,然后processB将汇集数千个新的期货,这些期货将在processA完成后处理.onComplete回调将开始很晚才开始...

我想把这个深度优先:过程A的几个未来开始,然后,processB从那里继续而不是切换到队列中的其他东西.

可以在香草scala中完成吗?我应该转向一些替代Futures()和ThreadPools的lib吗?

编辑:更详细一点.f1 andThen f2正如答案中所建议的那样,重写目前是不切实际的.实际上,processA and B正在做一堆其他事情(包括副作用).而processB依赖的事实ProcessA是私人的.如果曝光,它会破坏SoC.

编辑2:我想我会放松一点"香草"约束.有人建议Akka流可以提供帮助.我目前正在看scalaz.Task:有意见吗?

scala future threadpool

0
推荐指数
1
解决办法
478
查看次数

在Scala上使用带有选项返回类型的映射

说我有一个函数,它接受某种Option [] ...即:

def help(x: Int, 
         y : Option[BigInteger], 
         ec: ExecutionContext, 
         sc: SecurityContext): Future[Long] = { ... }
Run Code Online (Sandbox Code Playgroud)

我有一个用地图调用它的对象,比方说

val answerList: List[Future[Long]] =  random.getPersons
       .map(p => help(x , myY, ec, sc))
       .collect(Collectors.toList())
Run Code Online (Sandbox Code Playgroud)

我说"myY"就是这么说的

类型不匹配,预期Option [BigInteger],实际:BigInteger.

当我的帮助方法选择类型时,我会看到它的来源.

我尝试通过选择[myY]来投射myY,但这似乎没有帮助.假设帮助方法正确实施,有人可以帮助我或指出正确的方向吗?谢谢!

scala future option

0
推荐指数
1
解决办法
59
查看次数

Flutter Future &lt;dynamic&gt;与Future &lt;String&gt;子类型错误?

我刚刚更新了Flutter,并成功从中下载了我的原始项目git。现在我收到一个奇怪的Future错误。我可以在github上在线看到它,但是没有关于如何修复的明确答案。该项目甚至没有加载。它从我的main.dart文件中读取Future语句并返回此...

[VERBOSE-2:dart_error.cc(16)]未处理的异常:类型'Future dynamic'不是类型'Future String'的子类型,其中
Future来自dart:async
Future来自dart:async
字符串来自dart:core

***不确定错误在哪里。我的飞镖分析说“仅等待期货”。这是我的小部件开始构建之前运行的期货代码...

Future<Null> getData() async{
    FirebaseUser user = await FirebaseAuth.instance.currentUser;
    user = FirebaseAuth.instance.currentUser;
    var userid = user.uid;
    fb.child('users/${userid}').onValue.listen((Event event) {
      if (event.snapshot.value != null) {
        name = event.snapshot.value['displayName'];
        image = event.snapshot.value['image'];
      } else {
        name = "User";
      }
    });
  }
Run Code Online (Sandbox Code Playgroud)

asynchronous future dynamic dart flutter

0
推荐指数
1
解决办法
7505
查看次数

断言返回类型为Future [Unit]的scala方法的最佳方法是什么?

我有一个方法.此方法可能返回Future.failed(.....)或Future.successful(()).

def calculate(x:Int,y:Int):Future [Unit] = {........}

现在我需要测试这个方法.断言验证Future.successful(())案例的测试的最佳方法是什么 .

assert scala future scalatest

0
推荐指数
1
解决办法
944
查看次数

以下并发Scala程序的输出之间有什么区别

// First
import concurrent.Future
import concurrent.ExecutionContext.Implicits.global
for {
  _ <- Future { Thread.sleep(3000); println("a") }
  _ <- Future { Thread.sleep(2000); println("b") }
  _ <- Future { Thread.sleep(1000); println("c") }
} {}?

// Second
?import concurrent.Future
import concurrent.ExecutionContext.Implicits.global
val future1 = Future { Thread.sleep(3000); println("a") }
val future2 = Future { Thread.sleep(2000); println("b") }
val future3 = Future { Thread.sleep(1000); println("c") }
for {
  _ <- future1
  _ <- future2
  _ <- future3
} {}?
Run Code Online (Sandbox Code Playgroud)

concurrency multithreading scala future

0
推荐指数
1
解决办法
67
查看次数