我有关于ThreadPoolExecutor的这个相当简单的问题.我有以下情况:我必须从队列中使用对象,为它们创建适当的工作人员任务并将它们提交给ThreadPoolExecutor.这很简单.但在关闭情况下,许多工作人员可能会排队等待执行.由于其中一个任务可能运行了一个小时,并且我希望相对快速地正常关闭应用程序,我想从ThreadPoolExecutor中丢弃所有排队的任务,而已经处理的任务应该正常完成.
ThreadPoolExecutor文档有一个remove()方法,但只允许删除特定的任务.purge()仅适用于已取消的Future任务.我的想法是清除队列中包含所有排队的任务.ThreadPoolExecutor提供对此内部队列的访问,但文档指出:
方法getQueue()允许访问工作队列以进行监视和调试.强烈建议不要将此方法用于任何其他目的.
所以抓住这个队列并清除它不是一个选择.此外,该文档的片段说:
当大量排队的任务被取消时,两个提供的方法remove(java.lang.Runnable)和purge()可用于协助存储回收.
怎么样?当然,我可以维护我提交给执行程序的所有任务的列表,在关闭的情况下,我遍历所有条目并使用remove()方法将它们从ThreadPoolExecutor中删除......但是...来吧,这是一个浪费记忆力和维护这份清单的麻烦.(例如,删除已执行的任务)
我感谢任何提示或解决方案!
我试图了解如何在使用Spring进行事务管理的Java应用程序中实现线程.我在Spring文档中找到了TaskExecutor部分,而ThreadPoolTaskExecutor看起来很符合我的需求;
ThreadPoolTaskExecutor类
此实现只能在Java 5环境中使用,但也是该环境中最常用的实现.它公开了bean属性,用于配置java.util.concurrent.ThreadPoolExecutor并将其包装在TaskExecutor中.如果您需要一些高级的东西,例如ScheduledThreadPoolExecutor,建议您使用ConcurrentTaskExecutor.
但是我不知道如何使用它.我一直在寻找好的例子现在没有运气.如果有人能帮助我,我会很感激.
我经常搜索,但找不到解决问题的方法.
我有自己的类,BaseTask使用a ThreadPoolExecutor来处理任务.
如果我不想要优先级(即使用a PriorityBlockingQueue),这可以正常工作,但当我尝试使用ClassCastException我得到的ThreadPoolExecutor因为FutureTask将我的任务包装到一个FutureTask对象中.
这显然是可以的,因为Comparable它没有实现newTaskFor(),但我将如何继续解决优先级问题?
我读过您可以覆盖ThreadPoolExecutor在BaseTask,但我似乎无法在所有发现这个方法...?
我们欢迎所有的建议!
一些代码可以帮助:
在我的BaseFutureTask班上,我有
private static final BlockingQueue<Runnable> sWorkQueue = new PriorityBlockingQueue<Runnable>();
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
private static final BaseThreadPoolExecutor sExecutor = new BaseThreadPoolExecutor(
1, Integer.MAX_VALUE, …Run Code Online (Sandbox Code Playgroud) 我的问题是:使用它是否有意义Executors.newFixedThreadPool(1)??.在两个线程(main + oneAnotherThread)场景中,使用执行程序服务是否有效?是否通过调用new Runnable(){ }比使用ExecutorService更好地直接创建新线程?在这种情况下使用ExecutorService有什么好处和缺点?
PS:主线程和oneAnotherThread不访问任何公共资源.
我经历过:使用ExecutorService有什么好处?.并且一次只有一个线程!
假设我在Spark Streaming应用程序中有2个或更多执行程序.
我将批处理时间设置为10秒,因此每隔10秒就会启动一个作业,从HDFS读取输入.
如果每个作业持续时间超过10秒,则启动的新作业将分配给免费执行者吗?
即使前一个没完成?
我知道这似乎是一个明显的答案,但我在网站或与Spark Streaming相关的论文中没有找到任何关于作业安排的信息.
如果你知道所有这些事情都被解释的链接,我真的很感激看到它们.
谢谢.
这是中提到的Spring文档是:
ThreadPoolTaskScheduler实际上也实现了Spring的TaskExecutor接口,因此单个实例可以尽快用于异步执行,也可以用于计划和可能重复执行.
那么我们想要在ThreadPoolTaskExecutor实例上使用实例的场景是哪些ThreadPoolTaskScheduler?
我目前正在使用Spring XML.我正在创建ThreadPoolTaskScheduler如下bean :
<task:scheduler id="myScheduler" pool-size="1"/>
Run Code Online (Sandbox Code Playgroud)
而ThreadPoolTaskExecutor实例的bean 可以创建为
<task:executor id="executor" pool-size="10"/>
Run Code Online (Sandbox Code Playgroud) 我是YARN上的Spark的新手,并不了解YARN Containers和Spark 之间的关系Executors.我根据yarn-utils.py脚本的结果尝试了以下配置,可用于查找最佳群集配置.
Hadoop集群(HDP 2.4)我正在研究:
所以我跑了python yarn-utils.py -c 12 -m 64 -d 4 -k True(c =核心,m =内存,d = hdds,k = hbase-installed)并得到以下结果:
Using cores=12 memory=64GB disks=4 hbase=True
Profile: cores=12 memory=49152MB reserved=16GB usableMem=48GB disks=4
Num Container=8
Container Ram=6144MB
Used Ram=48GB
Unused Ram=16GB
yarn.scheduler.minimum-allocation-mb=6144
yarn.scheduler.maximum-allocation-mb=49152
yarn.nodemanager.resource.memory-mb=49152
mapreduce.map.memory.mb=6144
mapreduce.map.java.opts=-Xmx4915m
mapreduce.reduce.memory.mb=6144
mapreduce.reduce.java.opts=-Xmx4915m
yarn.app.mapreduce.am.resource.mb=6144
yarn.app.mapreduce.am.command-opts=-Xmx4915m
mapreduce.task.io.sort.mb=2457
Run Code Online (Sandbox Code Playgroud)
我通过Ambari界面进行的这些设置并重新启动了群集.这些值也大致与我之前手动计算的值相匹配. …
containers executor hadoop-yarn hortonworks-data-platform apache-spark
我使用线程池来执行任务,这些任务主要是基于cpu的I/O,大小比cpus的数量大1.
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1)
Run Code Online (Sandbox Code Playgroud)
简单的程序的假设情况下提交的所有任务,在此执行程序并没有什么别的我假设有一个线程池的任何较大会减慢事情,因为操作系统将不得不时间片它的CPU往往机会给每个线程的线程池机会跑.
这是正确的,如果这是一个真正的问题或主要是理论上的,即如果我将线程池大小增加到1000我会注意到一个巨大的差异.
我想在Executor中使用asyncio调用loop.run_in_executor启动阻塞函数,然后稍后取消它,但这对我来说似乎不起作用.
这是代码:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_func(seconds_to_block):
for i in range(seconds_to_block):
print('blocking {}/{}'.format(i, seconds_to_block))
time.sleep(1)
print('done blocking {}'.format(seconds_to_block))
@asyncio.coroutine
def non_blocking_func(seconds):
for i in range(seconds):
print('yielding {}/{}'.format(i, seconds))
yield from asyncio.sleep(1)
print('done non blocking {}'.format(seconds))
@asyncio.coroutine
def main():
non_blocking_futures = [non_blocking_func(x) for x in range(1, 4)]
blocking_future = loop.run_in_executor(None, blocking_func, 5)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)
blocking_future.cancel()
yield from asyncio.wait(non_blocking_futures)
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
loop.set_default_executor(executor)
asyncio.async(main())
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)
我希望上面的代码只允许阻塞函数输出:
blocking 0/5
blocking …Run Code Online (Sandbox Code Playgroud) 我正在使用databricks spark cluster(AWS),并测试我的scala实验.使用LogisticRegressionWithLBFGS算法训练10 GB数据时遇到了一些问题.我遇到问题的代码块如下:
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
val algorithm = new LogisticRegressionWithLBFGS()
algorithm.run(training_set)
Run Code Online (Sandbox Code Playgroud)
首先,我有很多执行程序丢失失败和java内存问题,然后我用更多分区重新分区我的training_set并且内存不足问题已经消失,但仍然得到执行程序丢失失败.
我的群集共有72个核心和500GB内存.任何人都能对此有所了解吗?
executor ×10
java ×5
apache-spark ×3
threadpool ×3
spring ×2
concurrency ×1
containers ×1
event-loop ×1
futuretask ×1
hadoop-yarn ×1
python ×1
scala ×1
task ×1