Scala中的后台任务

Jeb*_*Jeb 31 multithreading scala

我有一个缓存,我想定期检查和修剪.在Java中,我会做以下事情:

new Thread(new Runnable() {
  void run() {
    while (true) { 
      Thread.sleep(1000);
      // clear the cache's old entries
    }
  }
}).start();
Run Code Online (Sandbox Code Playgroud)

当然,我有一些线程安全类型的问题可以用作缓存,但是把它放在一边,我的问题很简单.什么是Scala运行重复后台任务的方式 - 您不希望在应用程序的主线程中运行?

我曾经使用过一些演员,我猜我在这个场景中的问题是我没有任何东西可以生成一条消息,是时候清除缓存了.或者更确切地说,我可以想象生成这些消息的唯一方法是创建一个线程来执行它...

编辑:我需要人们对答案进行投票 - 他们对我来说都很好看

小智 24

有很多方法可以做到这一点,但我会做一些简单的事情,如下所示.

import scala.concurrent.ops._

spawn {
  while (true) { 
    Thread.sleep(1000);
    // clear the cache's old entries
  }
}
Run Code Online (Sandbox Code Playgroud)

希望这可以帮助.

  • `scala.concurrent.ops`对象现已弃用(自版本2.10.0起). (10认同)
  • ..并确保处理异常 (5认同)
  • @Kai能否请您提供一些示例链接? (3认同)
  • @Alex来自文档:`使用Future代替 (2认同)

par*_*tic 11

您可以使用Akka Scheduler,它允许您向执行该作业的(akka)演员发送重新安排消息.从文档中,只需使用:

import akka.actor.Scheduler

//Sends messageToBeSent to receiverActor after initialDelayBeforeSending and then after each delayBetweenMessages
Scheduler.schedule(receiverActor, messageToBeSent, initialDelayBeforeSending, delayBetweenMessages, timeUnit)
Run Code Online (Sandbox Code Playgroud)

  • 我认为这是Akka早期版本的代码; 使用更新版本执行相同操作,请参阅http://doc.akka.io/docs/akka/current/java/scheduler.html (2认同)

Jus*_*s12 8

Futures是一种简单的方法,无需显式启动新线程

import scala.actors.Futures._

// main thread code here

future {
   // second thread code here
}

// main thread code here
Run Code Online (Sandbox Code Playgroud)

  • 期货是为_one time_任务而非_recurring_ tasks设计的.对于重复性任务而言,开销很大并且是不必要的. (14认同)
  • @RexKerr:但是关于基于"spawn"的接受答案的弃用信息给出了将来替换"spawn"的方向.那么Scala 2.10的正确答案是什么?@ Jus12:要完成这项工作,可能需要`import scala.concurrent.ExecutionContext.Implicits.global`.@akauppi:我会接受一个`while(true){...}`以便以某种方式实现反复出现...... (4认同)

Bil*_*ill 7

spawn 很好,但请注意您的示例代码也适用于Scala:

new Thread(new Runnable() {
  override def run() {
    while (true) { 
      Thread.sleep(1000);
      // clear the cache's old entries
    }
  }
}).start();
Run Code Online (Sandbox Code Playgroud)

只需用隐式转换清理它:

implicit def funcToRunnable(f: => ()) = new Runnable() { override def run() { f() } }

new Thread{
  while(true) {
    Thread.sleep(1000);
    // blah
  }
}.start()
Run Code Online (Sandbox Code Playgroud)


Joh*_*loo 5

没有绑定线程的Actors:

import actors.{TIMEOUT, Actor}
import actors.Actor._

private case class Ping( client: Actor, update: Int )
private case class Pulse()
case class Subscribe( actor: Actor )
case class Unsubscribe( actor: Actor )

class PulseActor extends Actor {
  def act = eventloop {
        case ping: Ping => { sleep(ping.update); ping.client ! Pulse }
  }
  def sleep(millis: Long) =
    receiveWithin(millis) {
      case TIMEOUT =>
  }
}

class ServiceActor extends Actor {

  var observers: Set[Actor] = Set.empty
  val pulseactor = new PulseActor
  val update = 2000

  def act = {
    pulseactor.start
    pulseactor ! new Ping( this, update )
    loop {
      react {
        case sub: Subscribe => observers += sub.actor
        case unsub: Unsubscribe => observers -= unsub.actor
        case Pulse => pulse
      }
    }
  }


  def pulse {  
    //cpuload = CPUprofile.getCPUload.getOrElse{ List(0.0) }  //real work
    observers foreach { observer => observer ! "CPUloadReport( cpuload )" }
    pulseactor ! Ping(this, update)
  }
}

object Exercise extends App {
  val deamon = new ServiceActor
  deamon.start
}
Run Code Online (Sandbox Code Playgroud)