我开始学习java,现在我正处于并发章节.在阅读了一些关于并发的内容后,我尝试了一个自己的例子.
public class Task implements Runnable{
public void run() {
while(!Thread.interrupted()) {
try {
System.out.println("task");
TimeUnit.SECONDS.sleep(2);
}catch (InterruptedException e) {
System.out.println("interrupted");
}
}
}
}
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Task());
TimeUnit.SECONDS.sleep(10);
exec.shutdownNow();
}
Run Code Online (Sandbox Code Playgroud)
问题是我期待看到以下输出:
task
task
task
task
task
interrupted
Run Code Online (Sandbox Code Playgroud)
但是在我得到这个之后,程序会继续打印,直到我关闭它.
所以,我的问题是我做错了什么?为什么程序继续打印?
我正在尝试执行一个简单的计算(它调用Math.random()10000000次).令人惊讶的是,以简单方法运行它比使用ExecutorService要快得多.
我已经阅读了ExecutorService令人惊讶的性能收支平衡点的另一个主题---经验法则?并尝试通过执行Callable使用批次来遵循答案,但性能仍然很差
如何根据我当前的代码改进性能?
import java.util.*;
import java.util.concurrent.*;
public class MainTest {
public static void main(String[]args) throws Exception {
new MainTest().start();;
}
final List<Worker> workermulti = new ArrayList<Worker>();
final List<Worker> workersingle = new ArrayList<Worker>();
final int count=10000000;
public void start() throws Exception {
int n=2;
workersingle.add(new Worker(1));
for (int i=0;i<n;i++) {
// worker will only do count/n job
workermulti.add(new Worker(n));
}
ExecutorService serviceSingle = Executors.newSingleThreadExecutor();
ExecutorService serviceMulti = Executors.newFixedThreadPool(n);
long s,e;
int tests=10; …Run Code Online (Sandbox Code Playgroud) 我的JVM在带有openJDK 1.7.0_51 64Bit的CentOS 6.0上运行时遇到问题.我的系统是一个4核系统,8GB Ram.
我正在运行我自己编写的Java多线程应用程序.它应该将大量数据插入NoSQL数据库.为此,我使用java.concurrent.Executors中的"CachedThreadPoolExecutor"生成4个线程.
我实例化了4个实现"Runnable"接口的Worker.然后我使用线程池执行Thread.这是我的代码:
public void startDataPump(int numberOfWorkers){
//class "DataPump" implements runnable
for (int i = 0; i < numberOfWorkers; i++){
DataPump pump = new DataPump();
//"workerList" contains all workers and is a simple arrayList to keep track of the workers
workerList.add(pump);
//"workers" is the thradpool that has been
//initialized earlier with "Executors.newCachedThreadPool()
workers.execute(pump);
}
}
Run Code Online (Sandbox Code Playgroud)
运行它时,使用参数4,它将在Threadpool中生成4个Threads.我假设JVM或我的操作系统足够聪明,可以在我的所有内核上安排这些线程.但是,只有我的cpu的一个核心工作在100%,其他核心几乎都处于空闲状态.
我在代码中做错了什么,或者这是一个JVM/OS问题.如果是这样,我能做些什么吗?仅在1个核心上运行此应用程序是极速减慢整个应用程序.
非常感谢帮助:)
我是Python的多线程新手,目前正在编写一个附加到csv文件的脚本.如果我要将多个线程提交给一个concurrent.futures.ThreadPoolExecutor将行附加到csv文件的行.如果附加是这些线程唯一与文件相关的操作,我该怎么做才能保证线程安全?
我的代码的简化版本:
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
for count,ad_id in enumerate(advertisers):
downloadFutures.append(executor.submit(downloadThread, arguments.....))
time.sleep(random.randint(1,3))
Run Code Online (Sandbox Code Playgroud)
我的线程类是:
def downloadThread(arguments......):
#Some code.....
writer.writerow(re.split(',', line.decode()))
Run Code Online (Sandbox Code Playgroud)
我应该设置一个单独的单线程执行程序来处理写入,还是担心我是否只是附加?
编辑:我应该详细说明,当写入操作发生时,下一次附加文件之间的分钟数差别很大,我只关心在测试我的脚本时没有发生这种情况,我宁愿为此加以覆盖.
我在AWS-EMR集群上运行我的工作.它是使用cr1.8xlarge实例的40节点集群.每个cr1.8xlarge有240G内存和32个内核.我可以使用以下配置运行:
--driver-memory 180g --driver-cores 26 --executor-memory 180g --executor-cores 26 --num-executors 40 --conf spark.default.parallelism=4000
Run Code Online (Sandbox Code Playgroud)
要么
--driver-memory 180g --driver-cores 26 --executor-memory 90g --executor-cores 13 --num-executors 80 --conf spark.default.parallelism=4000
Run Code Online (Sandbox Code Playgroud)
从作业跟踪器网站开始,同时运行的任务数量主要只是可用的核心数(cpu).所以我想知道我们是否希望每个节点有多个执行程序?
谢谢!
我正在创建一个ExecutorService执行某些任务的程序,在正常条件下,这些任务可能需要一分钟才能完成,但是在任何情况下,从任务开始时不得超过两分钟。
我的代码如下:
ExecutorService executorService = Executors.newFixedThreadPool(10);
ArrayList<Future<?>> futuresList = new ArrayList<Future<?>>();
for (String singleTask: taskList) {
futuresList.add(executorService.submit( new Runnable(){
@Override
public void run(){
try {
performTask(p1, singleTask, p3);
} catch (IOException | InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}));
}
for(Future<?> future : futures) {
future.get(120, TimeUnit.SECONDS);
}
Run Code Online (Sandbox Code Playgroud)
这将阻塞直到指定的超时,然后继续。我的问题如下:
1)如果task1阻塞了两分钟,task2并且阻塞了两分钟-那么task2将“阻塞”总共4分钟(因为future.get(120, TimeUnit.SECONDS); 直到task1完成阻塞才在task2上调用它)-即使两个任务都在同一时间提交并开始执行时间
2)如果我提交的任务多于10个,则future.get(120, TimeUnit.SECONDS); 在第11个任务上调用之前的任务尚未完成的情况下,任务11+可能永远不会阻塞所需的时间
我的目标是使每个任务最多执行两分钟,而不管列表中的任务数量如何,以及之前或之后有多少个任务。
谢谢
我有一个 Spring rest 控制器,它使用 Spring 的 @Async 方法调用异步方法,并立即向客户端返回一个 http 202 代码(已接受)。(异步作业很繁重,可能导致超时)。所以实际上,在异步任务结束时,我会向客户发送一封电子邮件,告诉他请求的状态。
一切正常,但我问自己,如果我的服务器/jvm 崩溃或关闭,我该怎么办?我的客户会收到一个 202 代码,并且永远不会收到状态电子邮件。
有没有一种方法可以(实时)同步数据库甚至文件中的 ThreadPoolTaskExecutor,让服务器在启动时恢复,而无需我自己管理复杂的规则和演变状态?
这是我的 Executor 配置
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("asyncTaskExecutor-");
executor.setAwaitTerminationSeconds(120);
executor.setKeepAliveSeconds(30);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
}
Run Code Online (Sandbox Code Playgroud)
启动异步任务的控制器
@RequestMapping(value = "/testAsync", method = RequestMethod.GET)
public void testAsync() throws InterruptedException{
businessService.doHeavyThings();
}
Run Code Online (Sandbox Code Playgroud)
异步方法调用:
@Async
public void doHeavyThings() throws …Run Code Online (Sandbox Code Playgroud) std::promise有一个构造函数,它分配用于存储共享状态的内存。.then在我到目前为止所看到的 的实现中,人们以类似于共享状态std::function 的类型擦除方式存储函数。stlab 的并发库甚至放置了一个
using then_t = std::vector<std::pair<executor_t, task<void()>>>;
Run Code Online (Sandbox Code Playgroud)
进入共享状态(以启用可拆分的 future)。
是否有意指定是否可以为延续指定分配器?
在P0443R3中,他们在 1.2.9 自定义内存分配的属性中说
执行器实现应使用提供的分配器来分配存储提交的函数对象所需的任何内存。
我认为这仅意味着通过函数提交的代理的存储,而不是共享状态中的execute指针。 std::future
我研究了最近关于期货或执行人的提案,但我找不到任何东西。无论如何,最近的论文似乎倾向于让执行者处于共同的未来状态。如果执行器有一个关联的自定义分配器,是否应该使用它?
我感觉我错过了什么。
--
编辑:我对此进行了更多思考,我想我可以理解为什么有人愿意不明确这一点。如果 future 由执行者参数化,那么它们的共享状态可能如下所示
template <typename Executor, typename T>
struct shared_state_t {
// some prevention from race condition, for example
std::atomic<unsigned> flags;
// some sort of exception or value representation, for example
std::variant<std::exception_ptr, T> maybe_value;
// some handle to continuation
continuation_t<Executor, T> continuation;
};
Run Code Online (Sandbox Code Playgroud)
wherecontinuation_t<Executor, …
我正在开发一些最终将使用线程池进行多线程的代码Executor。线程池执行的任务将进行回调并(有时)将进一步的任务提交到任务队列。我想首先开发单线程代码,使其正确(我正在使用测试驱动开发),然后才进行更改以确保线程安全(锁等)。为此,我需要一个Executor可以安全地与非线程安全代码一起使用的代码。
我认为这意味着我需要一个Executor单线程的。也就是说,它导致所有工作都由调用线程完成。JRE 提供这样的吗Executor?或者是否可以将其中一个Executor配置为在该模式下运行?
我已经在使用Humble Object测试模式来测试我的大部分单线程代码。然而,我的一些代码必须与 anExecutor或可能与 an交互ExecutorService,因为它涉及任务的 调度和重新提交,并且它将以一种不平凡的方式进行。测试该代码是这里的挑战。这些任务更新一个共享对象,该对象保存其结果和输入数据。我想推迟使该共享对象线程安全,直到我实现并调试了调度和重新提交代码。
有人可以从实际角度解释一下 Kotlin CoroutineExecutorCoroutineDispatcher和Kotlin Coroutine 之间的区别,即在哪些场景下使用一个协程来对抗另一个协程?CoroutineDispatcher
到目前为止,我一直在使用Dispatchers,但是(据我所知)它不能给我一个后台线程。这就是我使用的原因newSingleThreadExecutor()。
但我注意到的是,我的主进程在使用ExecutorCoroutineDispatcher(1)时永远不会结束( CoroutineDispatcher它按预期完成(2))。经过一番调查后,我似乎应该运行方法close()来ExecutorCoroutineDispatcher完成主流程(3)。由于 CoroutineDispatcher您不必这样做,因此它甚至没有方法close()(4)。是CoroutineDispatcher自动关闭的吗?为什么我们有 for 的关闭流程 ExecutorCoroutineDispatcher,但没有 for CoroutineDispatcher?
下面是我用于测试的代码:
fun main() = runBlocking<Unit> {
val dispatcher1 = Executors.newSingleThreadExecutor().asCoroutineDispatcher() // (1) <-- main process runs indefinitely w/o closing dispatcher1 (3)
val dispatcher2 = Dispatchers.Unconfined // (2)
println("Start")
launch(dispatcher1) {
println("Child")
delay(1000)
printInfo(coroutineContext, this)
}.join()
println("End")
dispatcher1.close() // (3) <-- need to close dispatcher1 for …Run Code Online (Sandbox Code Playgroud) executor ×10
java ×6
concurrency ×3
apache-spark ×1
asynchronous ×1
c++ ×1
csv ×1
dispatcher ×1
future ×1
jvm ×1
memory ×1
performance ×1
python ×1
spring ×1
unit-testing ×1