就像使用join()连接两个线程一样,有没有办法将线程连接到ExecutorService对象(线程池)?
我在Java中使用ExecutorService,我注意到了一个我不理解的行为.我使用Callable,当我调用我的线程(实现Callable的类)时,我设置了一个超时.然后我等待结果,future.get()之后我想检查future.isDone()执行任务期间是否发生超时.
正如我在有关超时的invokeAll文档中所读到的那样: returns a list of Futures representing the tasks, in the same sequential order as produced by the iterator for the given task list. If the operation did not time out, each task will have completed. If it did time out, some of these tasks will not have completed.
所以我想我会在两种情况下获得Future结果列表,如果发生超时,如果没有.
现在发生的事情如下:当发生超时时,代码不会继续future.get(),我没有达到可以检查是否发生超时的程度future.isDone().我没有发现任何异常,我直接导致我的代码中的finally块,我真的不明白.
这是我的代码片段:
try {
// start all Threads
results = pool.invokeAll(threads, 3, TimeUnit.SECONDS);
for (Future<String> future : results)
{
try
{ …Run Code Online (Sandbox Code Playgroud) 参考以下代码.
for (Future<Long> future : list) {
try {
sum += future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
Run Code Online (Sandbox Code Playgroud)
现在,当我打电话给future.get这将是阻止通话吗?因此,如果我在主线程和Android应用程序上执行此操作将导致ANR异常?我注意到也会抛出Interrupted异常,这是否意味着它们是在get函数中调用线程的睡眠?
我正在尝试使用执行器接口实现多线程方法,其中我在主类中生成了多个线程
class Main
{
private static final int NTHREADS = 10;
public static void main(String[] args)
{
.........
String str = createThreads(document);
.............
}
public String createThreads(String docString)
{
........
.......
Map<String,String> iTextRecords = new LinkedHashMap<String, String>();
if(!iText.matches(""))
{
String[] tokenizedItext = iText.split("\\^");
ExecutorService executor = Executors.newFixedThreadPool(NTHREADS);
for(int index = 0 ;index < tokenizedItext.length;index++)
{
Callable<Map<String,String>> worker = null;
Future<Map<String,String>> map = null;
if(tokenizedItext[index].matches("^[0-9.<>+-= ]+$") || tokenizedItext[index].matches("^\\s+$"))
{
iTextRecords.put(tokenizedItext[index],tokenizedItext[index]);
}
else
{
worker = new MultipleDatabaseCallable(tokenizedItext[index],language);
map = executor.submit(worker);
try …Run Code Online (Sandbox Code Playgroud) 我在一夜之间启动了我的实例以了解它是如何处理的,当我今天早上到达时,我正面临着一个问题
Exception in thread "pool-535-thread-7" java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:691)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:943)
at java.util.concurrent.ThreadPoolExecutor.processWorkerExit(ThreadPoolExecutor.java:992)[info] application - Connecting to server A
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
Run Code Online (Sandbox Code Playgroud)
我的代码的目的很简单:每隔5分钟,我连接到远程服务器列表,发送请求(通过套接字),就是这样.
这是我的代码:
我的"cron"任务:
/** will create a new instance of ExecutorService every 5 minutes, loading all the websites in the database to check their status **/
/** Maybe that's where the problem is ? I need to empty (GC ?) this ExecutorService ? **/
Akka.system().scheduler().schedule(
Duration.create(0, TimeUnit.MILLISECONDS), …Run Code Online (Sandbox Code Playgroud) 我在ThreadPoolExecutor中运行任务时发现了意外的死锁.
这个想法是一个主要任务,它启动一个改变标志的次要任务.主任务暂停,直到辅助任务更新标志.
我想知道:
corePoolSize = 2是一个安全值来防止这种死锁吗?
import java.util.concurrent.*;
class ExecutorDeadlock {
/*------ FIELDS -------------*/
boolean halted = true;
ExecutorService executor;
Runnable secondaryTask = new Runnable() {
public void run() {
System.out.println("secondaryTask started");
halted = false;
System.out.println("secondaryTask completed");
}
};
Runnable primaryTask = new Runnable() {
public void run() {
System.out.println("primaryTask started");
executor.execute(secondaryTask);
while (halted) {
try {
Thread.sleep(500);
}
catch (Throwable e) {
e.printStackTrace();
}
}
System.out.println("primaryTask completed");
}
}; …Run Code Online (Sandbox Code Playgroud)public class myService {
@Autowired
ExecutorService executor;
public Result getServiceResult(Token token, Profile profile){
//validate token and profile
return getResult(token, profile).get();
}
private getResult (Token token, Profile profile){
Future<Result> = threadPoolExecutor.submit(
() -> myManager.createAccount(token, profile));
}
}
Run Code Online (Sandbox Code Playgroud)
这段代码在我目前的工作中运行良好.我无法理解如何threadPoolExecutor.submit通过"功能/方法"但不能通过callable?
我正在使用Java 8和Spring框架.
我通过流传递一组任务,这是一个简化的演示:
ExecutorService executorService = Executors.newCachedThreadPool((r) -> {
Thread thread = new Thread();
thread.setDaemon(true); // even I removed this, it's still not working;
return thread;
});
IntStream.range(0, TASK_COUNT).forEach(i -> {
executorService.submit(() -> {
out.println(i);
return null;
});
});
Run Code Online (Sandbox Code Playgroud)
在提交完所有任务后,我尝试等待所有这些任务完成后使用:
executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
Run Code Online (Sandbox Code Playgroud)
但输出没有,没有打印出来.
有问题?任何帮助将不胜感激.
一个奇怪的发现是,当使用默认的DefaultThreadFactory时,它正在工作.
ExecutorService executorService = Executors.newCachedThreadPool();
Run Code Online (Sandbox Code Playgroud)
FYI守护程序线程是我已经检查过的原因.为了调试,我故意设置它们.
我认为Java错误是严重问题的征兆,因此不应进行处理。那么为什么这段代码运行良好?
public static void main(String[] args)
{
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(() -> {throw new AssertionError();});
while (!future.isDone()) {
}
System.out.println("done");
}
Run Code Online (Sandbox Code Playgroud)
我有一个未实现的方法,该方法抛出AssertionError来提醒我实现它,但是它完全被吞没了,没有任何迹象表明存在严重错误。
我目前正在与Eclipse Paho一起开发MQTT客户端服务,以使用更大的软件,并且遇到性能问题。我收到了很多要发布给代理的事件,并且正在使用GSON对这些事件进行序列化。我已经对序列化和发布进行了多线程处理。根据基本基准,序列化和发布最多需要1毫秒。我使用的ExecutorService的线程池大小为10(目前)。
我的代码当前每秒向ExecutorService提交大约50个Runnable,但是我的代理每秒仅报告5-10条消息。我之前已经对MQTT设置进行了基准测试,并设法以非多线程方式每秒发送约9000条以上的MQTT消息。
线程池是否有这么多的开销,我只能从中得到这么少的发布?
public class MqttService implements IMessagingService{
protected int PORT = 1883;
protected String HOST = "localhost";
protected final String SERVICENAME = "MQTT";
protected static final String COMMANDTOPIC = "commands";
protected static final String REMINDSPREFIX = "Reminds/";
protected static final String VIOLATIONTOPIC = "violations/";
protected static final String WILDCARDTOPIC = "Reminds/#";
protected static final String TCPPREFIX = "tcp://";
protected static final String SSLPREFIX = "ssl://";
private MqttClient client;
private MqttConnectOptions optionsPublisher = new MqttConnectOptions();
private ExecutorService pool = …Run Code Online (Sandbox Code Playgroud) executorservice ×10
java ×7
callable ×2
java-8 ×2
threadpool ×2
android ×1
concurrency ×1
deadlock ×1
future ×1
join ×1
mqtt ×1
runnable ×1
timeout ×1