子线程没有看到主线程所做的更新

Nit*_*ndy 8 multithreading scala thread-safety apache-spark

我正在通过扩展Sp​​arkListener类来实现SparkHealthListener.

@Component
class ClusterHealthListener extends SparkListener with Logging {
  val appRunning = new AtomicBoolean(false)
  val executorCount = new AtomicInteger(0)

  override def onApplicationStart(applicationStart: SparkListenerApplicationStart) = {
    logger.info("Application Start called .. ")
    this.appRunning.set(true)
    logger.info(s"[appRunning = ${appRunning.get}]")
  }

  override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) = {
    logger.info("Executor add called .. ")
    this.executorCount.incrementAndGet()
    logger.info(s"[executorCount = ${executorCount.get}]")
  }
}
Run Code Online (Sandbox Code Playgroud)

appRunningexecutorCount是在ClusterHealthListener类中声明的两个变量.ClusterHealthReporterThread只会读取值.

@Component
class ClusterHealthReporterThread @Autowired() (healthListener: ClusterHealthListener) extends Logging {
  new Thread {
    override def run(): Unit = {
      while (true) {
          Thread.sleep(10 * 1000)
          logger.info("Checking range health")
          logger.info(s"[appRunning = ${healthListener.appRunning.get}] [executorCount=${healthListener.executorCount.get}]"
      }
    }
  }.start()
}
Run Code Online (Sandbox Code Playgroud)

无论主线程对变量所做的更改,ClusterHealthReporterThread总是报告初始值?我究竟做错了什么?这是因为我将healthListener注入ClusterHealthReporterThread吗?

更新

我玩了一下,看起来它与我启动火花听众的方式有关.

如果我像这样添加spark监听器

val sparkContext = SparkContext.getOrCreate(sparkConf) sparkContext.addSparkListener(healthListener)

父线程将始终将appRunning显示为"false",但正确显示执行程序计数.子线程(健康记者)也会显示正确的执行者计数,但appRunning总是报告'false',就像主线程一样.

然后我偶然发现了为什么SparkListenerApplicationStart永远不会被解雇?并尝试在spark配置级别设置监听器,

.set("spark.extraListeners", "HealthListener class path")

如果我这样做,主线程将为appRunning报告'true' 并报告正确的执行程序计数,但子线程将始终报告执行程序的'false'和'0'值.

Ric*_*ich 1

我无法立即看出这里出了什么问题,您可能已经发现了一个有趣的边缘情况。

我认为 @m4gic 的评论可能是正确的,日志记录库可能正在缓存该插值字符串?看起来您正在使用https://github.com/lightbend/scala-logging,它声称此插值“对行为没有影响”,所以可能不会。请您按照他的建议在不使用该功能的情况下重试并报告回来吗?

第二种可能:请问ClusterHealthListener系统中是否只有一个?也许自动装配导致创建第二个实例?您能否在两个位置记录引用的对象 IDClusterHealthListener并验证它们是否是同一对象?

如果这些建议都不能解决这个问题,您能否发布一个我可以使用的工作示例?