我正在使用databricks spark cluster(AWS),并测试我的scala实验.使用LogisticRegressionWithLBFGS算法训练10 GB数据时遇到了一些问题.我遇到问题的代码块如下:
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
val algorithm = new LogisticRegressionWithLBFGS()
algorithm.run(training_set)
Run Code Online (Sandbox Code Playgroud)
首先,我有很多执行程序丢失失败和java内存问题,然后我用更多分区重新分区我的training_set并且内存不足问题已经消失,但仍然得到执行程序丢失失败.
我的群集共有72个核心和500GB内存.任何人都能对此有所了解吗?
我们的气流安装使用 CeleryExecutor。并发配置是
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 16
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16
# Are DAGs paused by default at creation
dags_are_paused_at_creation = True
# When not using pools, tasks are run in the "default pool",
# whose size is guided by this …Run Code Online (Sandbox Code Playgroud) 我有一个MyThread对象,当我的应用程序通过服务器加载时,我将其实例化,我将其标记为守护程序线程,然后调用start()它.只要应用程序处于活动状态,该线程就会等待队列中的信息.我的问题/问题是:目前MyThread正在扩展Thread,因为我将它标记为守护进程,并且我读到了如何实现Runnable以及使用Executor更令人满意.所以我想问的是,如果MyThread将实现Runnable而不是扩展Thread(当然会被重命名),我将使用newSingleThreadScheduledExecutor()how,what或者where在哪里标记为Daemon.我希望我没有弄乱一些条款,请原谅我,如果因为多线程环境的某些部分对我来说是非常新的.
谢谢Ittai
更新:我在我的应用程序中引用的模块是一个Web应用程序,它实际上有几个这样的线程,它们的共同点是它们都是ServletContext作为成员出于各种原因.目前,我延长Thread到WebThread它具有ServletContext作为memebr和所有子类可以利用这一点.如果我切换到与执行者了Runnable范式和的ThreadFactory比基本上我需要有一个丑陋的混合动力车WebRunnable,它实现Runnable并具有ServletContext作为公共成员,并且对我的ThreadFactory落实newThread(WebRunnable arg0)除了newThread(Runnable arg0).我不确定什么是最好的.谢谢
Spark内存开销相关问题在SO中多次被问到,我经历了其中的大部分。然而,在浏览了多个博客后,我感到困惑。
以下是我的疑问
https://docs.qubole.com/en/latest/user-guide/engines/spark/defaults-executors.html https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html
下面是我想了解的案例。我有5个节点,每个节点16个vcore和128GB内存(其中120个可用),现在我想提交spark应用程序,下面是conf,我在想
Total Cores 16 * 5 = 80
Total Memory 120 * 5 = 600GB
Run Code Online (Sandbox Code Playgroud)
情况1:执行器内存的内存开销部分
spark.executor.memory=32G
spark.executor.cores=5
spark.executor.instances=14 (1 for AM)
spark.executor.memoryOverhead=8G ( giving more than 18.75% which is default)
spark.driver.memoryOverhead=8G
spark.driver.cores=5
Run Code Online (Sandbox Code Playgroud)
情况 2:内存开销不是执行程序内存的一部分
spark.executor.memory=28G
spark.executor.cores=5
spark.executor.instances=14 (1 for AM)
spark.executor.memoryOverhead=6G ( giving more than 18.75% which is default)
spark.driver.memoryOverhead=6G
spark.driver.cores=5
Run Code Online (Sandbox Code Playgroud)
根据下面的视频,我尝试使用 85% 的节点,即 120GB 中的 100GB 左右,不确定我们是否可以使用更多。
https://www.youtube.com/watch?v=ph_2xwVjCGs&list=PLdqfPU6gm4b9bJEb7crUwdkpprPLseCOB&index=8&t=1281s (4:12)
我在使用Java预定执行程序时遇到了一个特殊问题,并且想知道我所经历的是否正常.
我需要安排以5秒的预定义速率执行的任务.预计这些任务将不时需要超过5秒的时间执行,但是当运行它们的时间低于5秒时,备份的任务列表应该快速连续运行以赶上.当运行的任务,重要的是要知道什么是原定的执行时间(考虑scheduledExecutionTime()中java.util.TimerTask).最后,我需要跟踪预定时间和实际时间之间的差异,以确定时间表"漂移"的时间和数量.
到目前为止,我已经使用Java执行器实现了所有这些,下面的类说明了一般的想法:
public class ExecutorTest {
public static final long PERIOD = 5000;
public static void main(String[] args) {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new Command(), 0, PERIOD, TimeUnit.MILLISECONDS);
}
private static final class Command implements Runnable {
long timestamp = 0;
public void run() {
long now = System.currentTimeMillis();
if (timestamp == 0) {
timestamp = now;
}
// Drift is the difference between scheduled time and execution time
long drift = now - timestamp;
String format = …Run Code Online (Sandbox Code Playgroud) 我写了一些Java代码来了解有关Executor框架的更多信息.
具体来说,我编写了代码来验证Collatz假设 - 这表示如果迭代地将以下函数应用于任何整数,最终会得到1:
f(n)=((n%2)== 0)?n/2:3*n + 1
CH仍然未经证实,我认为这是了解Executor的好方法.为每个线程分配一个范围[l,u]的整数来检查.
具体来说,我的程序有3个参数--N(我要检查CH的数字),RANGESIZE(线程必须处理的间隔的长度)和NTHREAD,线程池的大小.
我的代码工作正常,但我看到的速度比我预期的要少得多 - 当我从1线程变为4线程时,大约为30%.
我的逻辑是计算完全是CPU绑定的,每个子任务(检查固定大小范围的CH)大致需要相同的时间.
有谁有想法为什么我没有看到速度增加3到4倍?
如果你可以在增加线程数量(以及机器,JVM和操作系统)时报告你的运行时,这也是很好的.
细节
运行时:
java -d64 -server -cp.Collatz 10000000 1000000 4 => 4个线程,需要28412毫秒
java -d64 -server -cp.Collatz 10000000 1000000 1 => 1个线程,需要38286毫秒
处理器:
Quadcore Intel Q6600,2.4GHZ,4GB.机器已卸载.
Java的:
java版"1.6.0_15"Java(TM)SE运行时环境(版本1.6.0_15-b03)Java HotSpot(TM)64位服务器VM(版本14.1-b02,混合模式)
OS:
Linux quad0 2.6.26-2-amd64#1 SMP Tue Mar 9 22:29:32 UTC 2010 x86_64 GNU/Linux
代码:(我无法获取要发布的代码,我认为对于SO要求来说太长了,可以在Google Docs上找到源代码
import java.math.BigInteger;
import java.util.Date;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class MyRunnable implements Runnable {
public int …Run Code Online (Sandbox Code Playgroud) 我正在向Callablea 提交对象ThreadPoolExecutor,它们似乎在记忆中徘徊.
寻找与Eclipse的MAT工具堆转储看到Callable的物体正在被引用FutureTask$Sync的可调用的变量.这FutureTask$Sync是由一个被引用FutureTask的同步变量.这FutureTask是由引用FutureTask$Sync的这个$ 0变量.
我已经读过这个(这里,这里,以及SO),似乎FutureTask可调用包含在ThreadPoolExecutor's submit()中,它永远保存了对callable的引用.
我感到困惑的是如何确保FutureTask收集垃圾,以便它不会继续保持内存中的可调用内容,并保存可调用内容可能保留在内存中的任何内容?
只是为了提供有关我的特定情况的更多详细信息,我试图以ThreadPoolExecutor允许在需要时取消所有提交的任务的方式实现它.我尝试了好几种不同的方法,我发现在SO和其他地方,如完全查封了执行者(与shutdown(),shutdownNow()等),并保持期货的列表中返回submit(),并呼吁取消所有的人,然后清除期货的列表.理想情况下,我不想将其关闭,只需cancel()在需要时清除.
所有这些方法似乎没有什么区别.如果我向游泳池提交一个可调用的游戏,它很可能会最终粘在游泳池中.
我究竟做错了什么?
谢谢.
编辑:
根据要求,这是ThreadPoolExecutor的构造函数.
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
Run Code Online (Sandbox Code Playgroud)
经过进一步测试后,我可以看到,如果我让已经提交给ThreadPoolExecutor的任务完成,那么就没有泄漏.如果我试图取消它们,例如:
shutdownNow()
Run Code Online (Sandbox Code Playgroud)
或保存对未来的引用并稍后调用取消:
Future referenceToCancelLater = submit(task); …Run Code Online (Sandbox Code Playgroud) 我使用a ScheduledExecutorService来执行以固定速率调用服务的任务.该服务可能会将一些数据返回给任务.该任务将数据存储在队列中.其他一些线程慢慢从队列中挑选项目
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class EverlastingThread implements Runnable {
private ScheduledExecutorService executorService;
private int time;
private TimeUnit timeUnit;
private BlockingQueue<String> queue = new LinkedBlockingQueue<String>(500);
public EverlastingThread(ScheduledExecutorService executorService, int time, TimeUnit timeUnit) {
this.executorService = executorService;
this.time = time;
this.timeUnit = timeUnit;
}
public void run() {
// call the service. if Service returns any data put it an the queue
queue.add("task");
}
public void callService() throws Exception {
// while queue has …Run Code Online (Sandbox Code Playgroud) 我有以下简单的代码:
package main;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws InterruptedException {
new Main();
}
public Main() throws InterruptedException {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.schedule(new MyRunnable(), 10, TimeUnit.SECONDS);
System.out.println("Shutting down...");
executor.shutdown();
System.out.println("Awaiting termination...");
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
System.out.println("Main finished!");
}
private class MyRunnable implements Runnable {
public void run() {
System.out.println("Finished running!");
}
}
}
Run Code Online (Sandbox Code Playgroud)
实际上,虽然我的真实代码比这更复杂,但我可以在这些方面隔离问题.代码基本上等待10秒运行runnable,然后通知主程序的结束.
但是,我注意到了10秒钟,我的核心之一用于100%.
如果我评论这一行:
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
Run Code Online (Sandbox Code Playgroud)
cpu核心也以100%使用,主程序也在Runnable之前完成.
如果我评论这一行:
executor.shutdown();
Run Code Online (Sandbox Code Playgroud)
cpu已正确使用但程序无法完成.
如果我评论前两行,则cpu正确使用但主程序无法完成.
executor.shutdown();
做某种忙碌的等待而不是仅仅禁用提交新任务?附加细节:
$ java -version
java version "1.6.0_26" …Run Code Online (Sandbox Code Playgroud) 如果我运行持久的任务,如果第一个任务没有完成,Executor永远不会启动新的线程.有人可以帮我理解为什么以及如何解决这个问题?
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
public class TestExecutor {
@Test
public void test() throws InterruptedException {
ExecutorService checkTasksExecutorService = new ThreadPoolExecutor(1, 10,
100000, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
for (int i = 0; i < 20; i++) {
checkTasksExecutorService.execute(new Runnable() {
public void run(){
try {
System.out.println(Thread.currentThread().getName() + " running!");
Thread.sleep(10000);
} catch (Exception e) {
}
}
});
}
Thread.sleep(1000000);
}
}
Run Code Online (Sandbox Code Playgroud) executor ×10
java ×7
concurrency ×3
apache-spark ×2
airflow ×1
busy-waiting ×1
callable ×1
daemon ×1
futuretask ×1
hadoop-yarn ×1
linux ×1
memory-leaks ×1
pyspark ×1
scala ×1
scheduling ×1