我有这个问题,我有
private ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
Run Code Online (Sandbox Code Playgroud)
和每50毫秒创建的任务:
executor.scheduleAtFixedRate(myTask, 0, 50, TimeUnit.MILLISECONDS);
Run Code Online (Sandbox Code Playgroud)
myTask 有时需要一段时间才能完成(比如2-3秒左右),但newSingleThreadScheduledExecutor保证下一个预定的myTask将等到当前的myTask完成.
但是,我不时会收到此错误:
执行: java.util.concurrent.RejectedExecutionException
我该怎么办?谢谢
ScheduledExecutorService如果之前的任务尚未完成,我有兴趣使用为任务生成多个线程.例如,我需要每0.5秒处理一个文件.第一个任务开始处理文件,0.5秒后如果第一个线程没有完成第二个线程产生并开始处理第二个文件,依此类推.这可以通过以下方式完成:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(4)
while (!executor.isShutdown()) {
executor.execute(task);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// handle
}
}
Run Code Online (Sandbox Code Playgroud)
现在我的问题:为什么我不能这样做executor.scheduleAtFixedRate?
我得到的是,如果第一个任务需要更长时间,第二个任务会在第一个任务完成后立即启动,但即使执行程序具有线程池,也不会启动新线程.executor.scheduleWithFixedDelay很清楚 - 它在它们之间执行具有相同时间跨度的任务,并且完成任务所需的时间并不重要.所以我可能误解了ScheduledExecutorService目的.
也许我应该看看另一种执行者?或者只使用我在这里发布的代码?有什么想法吗?
BoundedExecutorJava Concurrency in Practice一书中的实现有些奇怪.
当有足够的线程在Executor中排队或运行时,它应该通过阻止提交线程来限制任务提交给Executor.
这是实现(在catch子句中添加缺少的rethrow之后):
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command) throws InterruptedException, RejectedExecutionException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
@Override public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
}
Run Code Online (Sandbox Code Playgroud)
当我BoundedExecutor用a Executors.newCachedThreadPool()和4的实例化实例化时,我希望缓存的线程池实例化的线程数永远不会超过4.但实际上,它确实如此.我已经得到了这个小测试程序来创建多达11个线程:
public static …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用asyncio来处理并发网络I/O. 在一个点上安排了大量的功能,这些功能在每个功能完成所需的时间上变化很大.然后,对于每个输出,在单独的过程中处理接收的数据.
处理数据的顺序是不相关的,因此,考虑到输出的潜在等待时间非常长,我希望await无论将来如何完成而不是预定义的顺序.
def fetch(x):
sleep()
async def main():
futures = [loop.run_in_executor(None, fetch, x) for x in range(50)]
for f in futures:
await f
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)
通常,等待期货排队的顺序很好:
蓝色表示每个任务在执行程序队列中的时间,run_in_executor即已被调用,但该函数尚未执行,因为执行程序仅同时运行5个任务; 绿色是执行功能本身所花费的时间; 红色是等待所有先前期货所花费的时间await.
在我的情况下,函数的时间变化很大,等待队列中的先前期货等待时会有很多时间丢失,而我可以在本地处理GET输出.这使得我的系统空闲一段时间只是在几个输出同时完成时被淹没,然后跳回空闲等待更多请求完成.
有没有办法await在执行者中首先完成任何未来?
我有这个问题:
我有一个计时器.使用scheduleAtFixedRate,它会创建一个新的Timer任务.在该计时器任务中有某些代码,可能需要一段时间才能完成.如何在前一个任务尚未完成时确保Timer不会创建新任务?
谢谢
在我的项目中,我正在构建一个Java执行框架,用于接收来自客户端的工作请求.工作(不同大小)被分解为一组任务,然后排队等待处理.有单独的队列来处理每种类型的任务,每个队列都与ThreadPool相关联.ThreadPools的配置方式使得引擎的整体性能最佳.
这种设计有助于我们有效地平衡请求,并且大量请求不会最终占用系统资源.但是,当某些队列为空且其各自的线程池处于空闲状态时,解决方案有时会失效.
为了使这更好,我正在考虑实现一个工作/任务窃取技术,以便负载很重的队列可以从其他ThreadPools获得帮助.但是,这可能需要实现我自己的Executor,因为Java不允许多个队列与ThreadPool相关联,并且不支持工作窃取概念.
阅读有关Fork/Join的信息,但这似乎不适合我的需求.构建此解决方案的任何建议或替代方法都非常有用.
谢谢安迪
我正在开发一个项目,其中以下行用于创建测试 Executor 成员变量实例:
private Executor executor = Runnable::run;
代码运行和编译,但我不明白如何Runnable::run创建一个实例,Executor因为它们是不同的接口。
有谁能解释一下吗?特别是:
Executor实现(因为Executor是不同的接口)?Executor被创造?例如单线程或池谢谢。
我想知道是否有任何理由使用Executor代替ExecutorService。
据我所知Executor,JDK中没有实现该接口,这也不是一个实现ExecutorService,这意味着您必须关闭该服务,以便不存在内存泄漏。您无法关闭系统Executor,但可以使用ExecutorService。
因此,在任何情况下都可以使用类似的方法:
private final Executor _executor = Executors.newCachedThreadPool();
Run Code Online (Sandbox Code Playgroud)
Executor接口的目的是什么?实例赞赏。
我是Python的多线程新手,目前正在编写一个附加到csv文件的脚本.如果我要将多个线程提交给一个concurrent.futures.ThreadPoolExecutor将行附加到csv文件的行.如果附加是这些线程唯一与文件相关的操作,我该怎么做才能保证线程安全?
我的代码的简化版本:
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
for count,ad_id in enumerate(advertisers):
downloadFutures.append(executor.submit(downloadThread, arguments.....))
time.sleep(random.randint(1,3))
Run Code Online (Sandbox Code Playgroud)
我的线程类是:
def downloadThread(arguments......):
#Some code.....
writer.writerow(re.split(',', line.decode()))
Run Code Online (Sandbox Code Playgroud)
我应该设置一个单独的单线程执行程序来处理写入,还是担心我是否只是附加?
编辑:我应该详细说明,当写入操作发生时,下一次附加文件之间的分钟数差别很大,我只关心在测试我的脚本时没有发生这种情况,我宁愿为此加以覆盖.
在API级别28(Pie)上,Context该类中引入了一种新方法来获取主线程的Executor getMainExecutor()。
如何使该执行程序的API级别低于28?
executor ×10
java ×7
concurrency ×3
python ×2
android ×1
csv ×1
exception ×1
java-8 ×1
python-3.x ×1
runnable ×1
threadpool ×1
timer ×1