标签: executors

如何从线程池中获取线程ID?

我有一个固定的线程池,我提交任务(限于5个线程).如何找出这5个线程中的哪一个执行我的任务(类似"线程#3 of 5正在执行此任务")?

ExecutorService taskExecutor = Executors.newFixedThreadPool(5);

//in infinite loop:
taskExecutor.execute(new MyTask());
....

private class MyTask implements Runnable {
    public void run() {
        logger.debug("Thread # XXX is doing this task");//how to get thread id?
    }
}
Run Code Online (Sandbox Code Playgroud)

java multithreading executorservice threadpool executors

127
推荐指数
3
解决办法
18万
查看次数

具有限制/吞吐量控制的Java Executor

我正在寻找一个Java Executor,它允许我指定限制/吞吐量/起搏限制,例如,不超过100个任务可以在一秒钟内处理 - 如果更多任务被提交,他们应该排队并稍后执行.这样做的主要目的是避免在遇到外部API或服务器时遇到限制.

我想知道基础Java(我怀疑,因为我检查过)或其他可靠的地方(例如Apache Commons)是否提供了这个,或者我是否必须自己编写.最好是轻量级的.我不介意自己写,但如果有一个"标准"版本在那里,我至少想先看看它.

java multithreading executors

27
推荐指数
2
解决办法
1万
查看次数

如何避免Spark执行器丢失和纱线容器由于内存限制而导致它被杀死?

我有以下代码,hiveContext.sql()大部分时间都会触发.我的任务是我想创建几个表并在处理完所有hive表分区后插入值.

所以我首先show partitions在for循环中触发并使用它的输出,我调用一些创建表的方法(如果它不存在)并使用它插入它们hiveContext.sql.

现在,我们不能hiveContext在执行程序中执行,所以我必须在驱动程序的for循环中执行它,并且应该逐个串行运行.当我在YARN集群中提交这个Spark作业时,几乎所有的时间我的执行程序都因为shuffle未找到异常而丢失.

现在这种情况正在发生,因为YARN因为内存过载而杀死了我的执行程序.我不明白为什么,因为我为每个hive分区设置了一个非常小的数据集,但它仍然导致YARN杀死我的执行程序.

以下代码是否会并行执行所有操作并尝试同时容纳内存中的所有hive分区数据?

public static void main(String[] args) throws IOException {   
    SparkConf conf = new SparkConf(); 
    SparkContext sc = new SparkContext(conf); 
    HiveContext hc = new HiveContext(sc); 

    DataFrame partitionFrame = hiveContext.sql(" show partitions dbdata partition(date="2015-08-05")"); 
  
    Row[] rowArr = partitionFrame.collect(); 
    for(Row row : rowArr) { 
        String[] splitArr = row.getString(0).split("/"); 
        String server = splitArr[0].split("=")[1]; 
        String date =  splitArr[1].split("=")[1]; 
        String csvPath = "hdfs:///user/db/ext/"+server+".csv"; 
        if(fs.exists(new Path(csvPath))) { 
            hiveContext.sql("ADD FILE " + csvPath); 
        } 
        createInsertIntoTableABC(hc,entity, …
Run Code Online (Sandbox Code Playgroud)

memory executors hadoop-yarn apache-spark apache-spark-sql

16
推荐指数
1
解决办法
2万
查看次数

Java8 ForkJoinPool和Executors.newWorkStealingPool之间的详细区别?

使用中的低级差异是什么:

ForkJoinPool = new ForkJoinPool(X);
Run Code Online (Sandbox Code Playgroud)

ExecutorService ex = Executors.neWorkStealingPool(X);
Run Code Online (Sandbox Code Playgroud)

其中X是所需的并行级别,即线程运行..

根据文档我发现它们相似.还告诉我哪一个在任何正常用途下更合适和安全.我有1.3亿个条目写入BufferedWriter并使用Unix排序按第1列排序.

另请告诉我如果可能的话要保留多少个线程.

注意:我的系统有 8个核心处理器和 32 GB RAM.

multithreading executorservice fork-join executors forkjoinpool

15
推荐指数
2
解决办法
5936
查看次数

Java并发:用很少的线程执行许多"无限"任务

我正在为一组N粒子构建一个(并发)模拟器,这些粒子根据牛顿定律在空间中移动.我的想法是将每个粒子建模为一个任务,它与其他粒子(任务)相互作用,以获得它们的位置和质量,以便计算它所受的净力.每个粒子任务都是如此

while(true){
   force = thisParticle.calculateNetForce(allTheParticles);
   thisParticle.waitForAllTheParticlesToCalculateNetForce(); // synchronization
   thisParticle.updatePosition(force);
   thisParticle.waitForAllTheParticlesToUpdateTheirState(); // synchronization
}
Run Code Online (Sandbox Code Playgroud)

我可以拥有大量粒子(100或更多),因此我无法创建如此多的Java线程(映射到物理线程).我的想法是使用Runtime.getRuntime().availableProcessors()+1可以执行许多任务的线程.

但是,我不能使用FixedThreadExecutor,因为粒子任务不会结束.我想使用FixedThreadExecutor,它必须也能够在内部执行某种调度.你知道为此目的吗?

或者,您是否可以通过并发的角度(例如,不同的任务分解)向我建议更好的方法来建模这样的系统?

Ps:我仅限于"经典"并发机制,不包括演员或类似的架构.

java concurrency executors

10
推荐指数
2
解决办法
827
查看次数

你有多少詹金斯执行官?

我和Jenkins一起运行并行测试.

我设置它的方式是我有一个构建流程作业,并行执行其他三个作业.其他三个作业连接到单独的Test XML文件.

当我最初开始这个时,我遇到的问题是只有两个作业会执行,而第三个作业只能在其他一个作业完成后执行.

我发现这是由于我的Jenkins设置了执行程序的数量,2现在设置为5.

然而,作为一个有趣的问题,仅仅为了将来的计划,詹金斯是否可以限制你可以拥有的执行者数量?或者是否有推荐的数字,你不应该超过?或者它完全取决于您运行它的环境?

如果有上限/建议数量不超过?我认为解决这个问题的最佳方法是使用主/从方案并将工作负载分散到多个VM.

例如,如果我将它设置为6个执行程序,这是否意味着我将在每个VM上有6个执行程序?或者在VM之间共享的6个执行程序?

executors jenkins

10
推荐指数
2
解决办法
8714
查看次数

如何从ScheduledThreadPoolExecutor中取消待处理的项目?

我有一项任务,要求我安排任务,并在发生特定事件时将其删除.我正在使用ScheduledThreadPoolExecutor来安排任务,这非常简单.但是,我找到了两种取消待处理项目的方法,它们看起来有点奇怪.

我很好奇他们是否有生产质量.如果他们两个都没有,那你有什么建议?

这是我做的骨架​​:

private final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1);

public void doStuff() {
    //...
    scheduler.schedule(new Runnable() {/*...*/}, 10, TimeUnit.MILISECONDS)
}

public void actOnEvent() {
    cancelPendingItems();
    doMoreStuff();
}

public void cancelPendnigItems() {
    // TODO implement me
}
Run Code Online (Sandbox Code Playgroud)

这是候选人1:

public void cancelPendingItems() {
    scheduler.getQueue().clear();
}
Run Code Online (Sandbox Code Playgroud)

这是候选人2:

public void cancelPendingItems() {
    for (Runnable task : scheduler.getQueue()) {
        scheduler.remove(task);
    }
}
Run Code Online (Sandbox Code Playgroud)

两者看起来像是一个黑客,因为它们依赖于ScheduledExecutor接口中未指定的ScheduledThreadPoolExecutor.queue属性.我有点担心我可能会违反ScheduledThreadPoolExecutor的不变量而且我会发现它太迟了.

那么,这些片段是否会按照我希望他们做的去做?有没有更好/更清洁的方法呢?

java multithreading executors

9
推荐指数
2
解决办法
5074
查看次数

ScheduledExecutorService,如何在不停止执行程序的情况下停止操作?

我有这个代码:

ScheduledExecutorService scheduledExecutor;
.....
ScheduledFuture<?> result = scheduledExecutor.scheduleWithFixedDelay(
    new SomethingDoer(),0, measurmentPeriodMillis, TimeUnit.MILLISECONDS);
Run Code Online (Sandbox Code Playgroud)

某些事件后,我应该停止行动,这在申报run()的方法SomethingDoer,它实现Runnable.

我怎样才能做到这一点?我无法关闭执行程序,我应该只撤销我的周期性任务.我可以用result.get()它吗?如果可以,请告诉我它将如何运作.

java executors

9
推荐指数
2
解决办法
2万
查看次数

超时后中止countDownLatch.await()

我使用一个ExecutorService实现一个3线程池,并使用CountDownLatch来监视所有线程的完成情况,以便进一步处理.

ExecutorService threadExecutor = Executors.newFixedThreadPool(3);
CountDownLatch countDownLatch = new CountDownLatch(3);

AuthorisationHistoryTask task1 =
    new AuthorisationHistoryTask(commonDataThread, countDownLatch );
PreAuthHistoryTask task2 = 
    new PreAuthHistoryTask(userID,sessionID, commonDataThread, countDownLatch );

SettlementHistTask task4 = new SettlementHistTask(commonDataThread,countDownLatch);

Future<Map<String, Object>> futureAuthHistory = threadExecutor.submit (task1);
Future<Map<String, Object>> futurePreAuthHist = threadExecutor.submit (task2);
Future<Map<String, Object>> futureSettleHist = threadExecutor.submit(task4);

threadExecutor.shutdown();

try {
     countDownLatch.await();
}catch (InterruptedException e) { 
     logger.logCommon("InterruptedException",CLASS_NAME);                     
}catch (ExecutionException ex) { 
     logger.logCommon("InterruptedException",CLASS_NAME); 
}
Run Code Online (Sandbox Code Playgroud)

我曾经countDownLatch.await()等到所有线程都完成了.我希望这个过程countDownLatch.await()在TIME OUT表示45秒后中止.我该如何实现呢?

java multithreading executorservice executors

8
推荐指数
1
解决办法
9206
查看次数

C++ 上下文中的执行程序模式是什么?

asio 的作者 Christopher Kohlhoff 正在为 C++ 中的执行程序开发一个库和提案。到目前为止,他的工作包括这个repodocs。不幸的是,基本原理部分尚未编写。到目前为止,文档给出了一些图书馆功能的例子,但我不觉得我遗漏了什么。不知何故,这不仅仅是一系列花哨的调用函数。

我在 Google 上可以找到的所有内容都非常特定于 Java,而且很多都是特定于特定框架的,所以我很难弄清楚这个“执行程序模式”是什么。

在这种情况下,执行者是什么?他们在做什么?什么时候它们会有所帮助的规范示例是什么?执行者之间存在哪些差异?执行者的替代方案是什么?它们如何比较?特别是,似乎与事件循环有很多重叠,其中事件是初始输入事件、执行事件和关闭事件。

当试图找出新的抽象概念时,我通常会发现理解动机的关键。那么对于执行者来说,我们试图抽象什么?为什么?我们试图使什么成为泛型?如果没有执行者,我们还需要做什么额外的工作?

c++ design-patterns executors

7
推荐指数
1
解决办法
3657
查看次数