小编yjs*_*hen的帖子

以编程方式从另一个应用程序提交并终止Spark应用程序

我想知道是否可以从其他服务提交,监控终止 spark应用程序.

我的要求如下:

我写了一个服务

  1. 解析用户命令
  2. 它们转换为已经准备好的Spark-SQL应用程序的可理解参数
  3. 使用spark-submitfrom 将应用程序连同参数一起提交给Spark ClusterProcessBuilder
  4. 并计划在集群模式下运行生成的应用程序驱动程序.

其他要求需要:

  • 查询应用程序状态,例如,百分比仍然存在
  • 适当的方式终止查询

我在spark独立文档中找到的建议使用以下命令终止应用程序:

./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>
Run Code Online (Sandbox Code Playgroud)

而且应该 find the driver ID through the standalone Master web UI at http://<master url>:8080.

那么,我该怎么办?

相关的SO问题:
Spark应用程序完成回调
从Java中的另一个应用程序部署Apache Spark应用程序,最佳实践

apache-spark

7
推荐指数
3
解决办法
1万
查看次数

根据当前运行环境获取tokio运行时句柄的正确方法

根据当前运行环境获取 tokio 运行时句柄的惯用方法是什么?

  • 对于已经在 tokio 运行时运行的方法,我想使用Handle.try_current().unwrap()它来获取当前的方法。
  • 对于不在 tokio 中运行的方法,我可以创建一个新方法:Runtime::new().unwrap().handle()

但是,当我将代码编写为:

fn get_runtime_handle() -> Handle {
    match Handle::try_current() {
        Ok(h) => h,
        Err(_) => Runtime::new().unwrap().handle().clone(),
    }
}

async fn a_async() -> Result<()> {
    ....
}

fn a() -> Result<()> {
   let handle = get_runtime_handle();
   handle.block_one (async { a_async().await; })
}

fn main() -> Result<()> {
    a();

    Ok(())
}

Run Code Online (Sandbox Code Playgroud)

tokio::fs::read_dir在内部调用,代码崩溃Error: Custom { kind: Other, error: "background task failed" }

当我handle.block_onRuntime::new().unwrap().handle().block_onin …

rust rust-tokio

7
推荐指数
1
解决办法
4800
查看次数

Scalac:在IDEA中运行Scalatest时断言失败

我在Intellij IDEA中运行ScalaTest Suite,在scala测试之前的make阶段,我遇到了这个问题:

Error:scalac: Error: assertion failed: List(object package$DebugNode, object package$DebugNode)
      java.lang.AssertionError: assertion failed: List(object package$DebugNode, object package$DebugNode)
       at scala.reflect.internal.Symbols$Symbol.suchThat(Symbols.scala:1678)
       at scala.reflect.internal.Symbols$ClassSymbol.companionModule0(Symbols.scala:2988)
       at scala.reflect.internal.Symbols$ClassSymbol.companionModule(Symbols.scala:2991)
       at scala.tools.nsc.backend.jvm.GenASM$JPlainBuilder.genClass(GenASM.scala:1371)
       at scala.tools.nsc.backend.jvm.GenASM$AsmPhase.run(GenASM.scala:120)
       at scala.tools.nsc.Global$Run.compileUnitsInternal(Global.scala:1583)
       at scala.tools.nsc.Global$Run.compileUnits(Global.scala:1557)
       at scala.tools.nsc.Global$Run.compileSources(Global.scala:1553)
       at scala.tools.nsc.Global$Run.compile(Global.scala:1662)
       at xsbt.CachedCompiler0.run(CompilerInterface.scala:126)
       at xsbt.CachedCompiler0.run(CompilerInterface.scala:102)
       at xsbt.CompilerInterface.run(CompilerInterface.scala:27)
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
       at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
       at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
       at java.lang.reflect.Method.invoke(Method.java:606)
       at sbt.compiler.AnalyzingCompiler.call(AnalyzingCompiler.scala:102)
       at sbt.compiler.AnalyzingCompiler.compile(AnalyzingCompiler.scala:48)
       at sbt.compiler.AnalyzingCompiler.compile(AnalyzingCompiler.scala:41)
       at org.jetbrains.jps.incremental.scala.local.IdeaIncrementalCompiler.compile(IdeaIncrementalCompiler.scala:28)
       at org.jetbrains.jps.incremental.scala.local.LocalServer.compile(LocalServer.scala:25)
       at org.jetbrains.jps.incremental.scala.remote.Main$.make(Main.scala:64)
       at org.jetbrains.jps.incremental.scala.remote.Main$.nailMain(Main.scala:22)
       at org.jetbrains.jps.incremental.scala.remote.Main.nailMain(Main.scala)
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
       at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
       at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
       at java.lang.reflect.Method.invoke(Method.java:606)
       at com.martiansoftware.nailgun.NGSession.run(NGSession.java:319)
Run Code Online (Sandbox Code Playgroud)

由于增量编译,它是一个jetbrains错误吗?或者它与我运行的测试套件有关?堆栈跟踪中的任何信息都与我测试的TestSuite /项目无关.

scala jetbrains-ide intellij-idea apache-spark

5
推荐指数
1
解决办法
2202
查看次数

如何避免在 Apache Pulsar 中自动删除非活动主题

我有一个应用程序,它在特定主题下向 Pulsar 生成消息,并在完成后关闭该应用程序;同时,不存在阅读此主题的消费者。

过了一会儿,当我创建一个consumer,想把写入的数据读出来的时候,发现我写的topic被Pulsar删除了,所有的数据都丢失了。

如何禁用 Pulsar 中非活动主题的自动删除?

message-queue apache-pulsar

4
推荐指数
1
解决办法
994
查看次数

演员实例数

我是 akka-actor 的新手,对一些问题感到困惑:

  1. 当我创建一个actorSystem,并使用actorOf(Props(classOf[AX], ...)) 在main 方法中创建actor 时,我的actor AX 有多少个实例?
  2. 如果 Q1 的答案只有一个,这是否意味着我在 AX 演员类的定义中创建的任何数据结构只会出现在一个线程中,我不应该担心并发问题?
  3. 如果我的演员的一个动作(接收方法中的一个案例)是一项耗时的任务并且需要很长时间才能完成怎么办?我的单个 Actor 实例在完成该任务之前不会响应吗?
  4. 如果 Q3 的答案是正确的,我应该怎么做才能防止我的演员没有回应?我应该启动另一个线程并向它发送另一条消息直到完成任务吗?有没有我应该遵循的最佳实践?

scala actor akka

1
推荐指数
1
解决办法
128
查看次数

如何概括圆方法

我有以下四种方法,使用BigDecimal来舍入一个数字:

private def round(input: Byte, scale: Int): Byte = {
  BigDecimal(input).setScale(scale, RoundingMode.HALF_UP).byteValue()
}

private def round(input: Short, scale: Int): Short = {
  BigDecimal(input).setScale(scale, RoundingMode.HALF_UP).shortValue()
}

private def round(input: Int, scale: Int): Int = {
  BigDecimal(input).setScale(scale, RoundingMode.HALF_UP).intValue()
}

private def round(input: Long, scale: Int): Long = {
  BigDecimal(input).setScale(scale, RoundingMode.HALF_UP).longValue()
}
Run Code Online (Sandbox Code Playgroud)

并计划将其概括为一轮:

private def round[T](input: Any, scale: Int, f: (BigDecimal) => T): T = {
  f(BigDecimal(input.asInstanceOf[T]).setScale(scale, RoundingMode.HALF_UP))
}
Run Code Online (Sandbox Code Playgroud)

并像这样使用这一轮:

round[Byte](b, scale, _.byteValue)
round[Short](s, scale, _.shortValue)
Run Code Online (Sandbox Code Playgroud)

但上述概括round无效,因为BigDecimal.apply不能适用T,我应该做什么?

scala

0
推荐指数
1
解决办法
118
查看次数