我正在实现一个parellel quicksort作为编程实践,在我完成后,我阅读了Executors上的Java教程页面,听起来他们可以让我的代码更快.不幸的是,我依靠join()来确保程序不会继续,直到所有内容都被排序.现在我正在使用:
public static void quicksort(double[] a, int left, int right) {
if (right <= left) return;
int i = partition(a, left, right);
// threads is an AtomicInteger I'm using to make sure I don't
// spawn a billion threads.
if(threads.get() < 5){
// ThreadSort's run method just calls quicksort()
Future leftThread = e.submit(new ThreadSort(a, left, i-1));
Future rightThread = e.submit(new ThreadSort(a, i+1, right));
threads.getAndAdd(2);
try {
leftThread.get();
rightThread.get();
}
catch (InterruptedException ex) {}
catch (ExecutionException ex) {}
}
else{ …Run Code Online (Sandbox Code Playgroud) 我正在寻找一种计算 python 3.2 futures.ThreadPoolExecutor 中挂起任务数量的方法。
直到知道,我有两个解决方案:1.- 在提交时增加待处理任务的计数并添加回调以在任务完成时减少计数 2.- 保留期货列表并检查哪些仍处于待处理状态
有没有更直接的东西(即执行者的属性或方法)?
提前致谢
我试图理解为Java 5的ThreadPoolExecutor指定单独的核心和最大池大小的重点.我的理解是,一旦队列满了,线程数就会增加,这似乎有点晚(至少有较大的队列).
是不是我很高兴为任务分配更多的线程,在这种情况下,我可能只是增加核心池大小; 或者我不是真的愿意这样做,在这种情况下我宁愿有更大的队列?什么是单独的核心和最大池大小有用的场景?
我希望能够在Java中的特定时间安排任务.我知道ExecutorService有能力定期安排,并在指定的延迟后安排,但我看的时间比一段时间更长.
有没有办法Runnable在2:00 进行执行,或者我是否需要计算从现在到2:00之间的时间,然后安排runnable在延迟后执行?
到目前为止,在我对 executorservice 的实验中,我有很多建议涉及使用 future.get,然后使用 future.cancel 来抛出一个线程中断,然后需要在线程中捕获该中断并在那里处理。我的问题有点不同。
假设我正在运行一个线程,它只是跟踪事情运行了多长时间,如果它们超过某个阈值,这是否是杀死 executorservice 和所有正在运行的线程的好方法?
示例思维过程:
ExecutorService threadPool = Executors.newFixedThreadPool(12);
timekeeper.start();
List<Future<?>> taskList = new ArrayList<Future<?>>();
for (int i = 0; i < objectArray.length; i++) {
Future<?> task = threadPool.submit(new ThreadHandler(objectArray[i], i));
taskList.add(task);
Thread.sleep(500);
}
if(timekeeper.now() > 60)
threadpool.shutdownNow();
}
Run Code Online (Sandbox Code Playgroud)
这行得通吗?我无法检查,因为我的线程很少失败(大约 1/700 运行,并且仅在我不工作的一天中的特定时间)。
执行程序可以在单个线程上运行多个任务吗?
显然,只有在一个物理核心上运行才能同时执行任务,但是有没有办法等待或让步,以便其他提交的任务可以运行?
如果没有等待,那么一个人通常还能确定另一项任务何时运行?
我编写了一个Spark应用程序,它通过SparkConf实例设置一些配置内容,如下所示:
SparkConf conf = new SparkConf().setAppName("Test App Name");
conf.set("spark.driver.cores", "1");
conf.set("spark.driver.memory", "1800m");
conf.set("spark.yarn.am.cores", "1");
conf.set("spark.yarn.am.memory", "1800m");
conf.set("spark.executor.instances", "30");
conf.set("spark.executor.cores", "3");
conf.set("spark.executor.memory", "2048m");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> inputRDD = sc.textFile(...);
...
Run Code Online (Sandbox Code Playgroud)
当我使用命令(master=yarn&deploy-mode=client)运行此应用程序时
spark-submit --class spark.MyApp --master yarn --deploy-mode client /home/myuser/application.jar
Run Code Online (Sandbox Code Playgroud)
一切似乎都运行良好,Spark History UI显示正确的执行者信息:

但是当用(master=yarn&deploy-mode=cluster)运行时
我的Spark UI显示错误的执行程序信息(~512 MB而不是~1400 MB):

此外,我的应用程序名称Test App Name在客户端模式下运行时等于,但spark.MyApp在群集模式下运行时.但是,在群集模式下运行时,似乎会采用某些默认设置.我在这做错了什么?如何为群集模式进行这些设置?
我在由YARN管理的HDP 2.5集群上使用Spark 1.6.2.
由于以下错误消息,我的 Spark 流应用程序(在 HD Insights 上运行)不断有任务失败:
ExecutorLostFailure (executor 525 exited unrelated to the running tasks) Reason: Container container_1495825717937_0056_01_000916 on host: 10.0.0.14 was preempted.
Run Code Online (Sandbox Code Playgroud)
你有什么想法我应该在这里做什么吗?我应该如何处理此错误消息对我来说并不明显。
我有一个在两个实例中使用云资源的火花集群。一为主人,一为工人。总资源为4核和10G ram。我可以启动shell,并且worker可以成功注册。但是当我运行简单的代码时。
来自shell的错误是:Spark版本:2.3.0系统:CentOS v7防火墙已停止。
这是配置:
export JAVA_HOME=/usr/java/jdk1.8.0_144
export SPARK_MASTER_IP=IP
export PYSPARK_PYTHON=/opt/anaconda3/bin/python
export SPARK_WORKER_MEMORY=2g
export SPARK_WORK_INSTANCES=1
export SPARK_WORkER_CORES=4
export SPARK_EXECUTOR_MEMORY=1g
Run Code Online (Sandbox Code Playgroud)
我使用三台物理计算机使用类似的配置设置了另一个Spark集群,它们运行良好。一开始我遇到了同样的错误,但是我通过停止防火墙解决了。是的,我想在云上设置集群,但不幸的是,我遇到了相同的错误,但是没有使用相同的解决方案解决它。我很好奇这是否是端口问题,因为我只在http 80,4040,6066,7077,8080,8081,8787上打开了端口。
这是错误:
以下是日志:
主日志:
2018-04-12 13:09:14 INFO Master:54 - Registering app Spark shell
2018-04-12 13:09:14 INFO Master:54 - Registered app Spark shell with ID app-20180412130914-0000
2018-04-12 13:09:14 INFO Master:54 - Launching executor app-20180412130914-0000/0 on worker worker-20180411144020-192.**.**.**-44986
2018-04-12 13:11:15 INFO Master:54 - Removing executor app-20180412130914-0000/0 because it is EXITED
2018-04-12 13:11:15 INFO Master:54 - Launching executor app-20180412130914-0000/1 on worker worker-20180411144020-192.**.**.**-44986
2018-04-12 …Run Code Online (Sandbox Code Playgroud) 我在由三个节点组成的集群上运行一个火花流应用程序,每个节点都有一个工作程序和三个执行程序(因此总共有9个执行程序)。我正在使用Spark版本2.3.2和Spark独立群集管理器。
调查工作计算机完全停机时的最近一个问题,我可以看到由于以下原因,火花流作业已停止:
18/10/08 11:53:03 ERROR TaskSetManager: Task 122 in stage 413804.1 failed 8 times; aborting job
Run Code Online (Sandbox Code Playgroud)
由于同一阶段中的一项任务失败了8次,因此该作业被中止。这是预期的行为。
提到的任务失败,原因如下:
18/10/08 11:53:03 INFO DAGScheduler: ShuffleMapStage 413804 (flatMapToPair at MessageReducer.java:30) failed in 3.817 s due to Job aborted due to stage failure: Task 122 in stage 413804.1 failed 8 times, most recent failure: Lost task 122.7 in stage 413804.1 (TID 223071001, 10.12.101.60, executor 1): java.lang.Exception: Could not compute split, block input-39-1539013586600 of RDD 1793044 not found
org.apache.spark.SparkException: Job aborted due to stage …Run Code Online (Sandbox Code Playgroud) executor apache-spark spark-streaming apache-spark-standalone
executor ×10
java ×5
apache-spark ×4
concurrency ×2
hadoop-yarn ×2
exit ×1
future ×1
interrupt ×1
python ×1
quicksort ×1
runnable ×1
threadpool ×1