我对生产者 - 消费者模式的理解是,它可以使用生产者和消费者之间共享的队列来实现.生产者将工作提交给共享队列,消费者检索它并处理它.它也可以由生产者直接提交给消费者来实现(生产者线程直接提交给Consumer的执行者服务).
现在,我一直在查看Executors类,它提供了一些线程池的常见实现.根据规范,方法newFixedThreadPool"重用在共享无界队列中运行的固定数量的线程".他们在这里谈论哪个队列?
如果Producer直接向使用者提交任务,那么它是包含Runnables列表的ExecutorService的内部队列吗?
或者它是中间队列,以防生产者提交到共享队列?
可能是我错过了重点,但请有人澄清一下吗?
我有一系列Runnable要执行的任务(即s)Executor.
每个任务都要求某个条件有效才能继续.我很想知道是否有办法以某种方式配置Executor在队列末尾移动任务,并在条件有效且任务能够执行和完成时尝试执行它们.
所以行为是这样的:
Thread-1从队列中获取任务run并被调用run条件尚未生效Thread-1任务放在队列的末尾,并执行下一个任务Thread-X(来自线程池)从队列条件再次选择任务是有效的并且正在执行任务我对以下内容感到困惑:
要在Java程序中使用线程,最简单的方法是扩展Thread类并实现runnable接口(或简单地实现runnable).
启动线程的执行.我们必须调用Thread的方法start(),然后调用线程的方法run().所以线程开始了.
方法start()(除非我错了)必须完全调用,每个线程只调用一次.因此,线程实例不能被重用,除非某种方式运行方法本身在某个短的无限循环中运行,这有利于自定义实现线程的重用.
现在javadoc
链接文本
说
如果可用,执行调用将重用先前构造的线程
我不明白这是如何实现的.我在执行方法的execute方法中提供了我的自定义线程,例如
ExecutorService myCachedPool = Executors.newCachedThreadPool();
myCachedPool.execute(new Runnable(){public void run(){
//do something time consuming
}});
Run Code Online (Sandbox Code Playgroud)
如何重用我删除到执行程序框架的自定义线程?
Executor是否允许调用方法start()超过1次,而我们不能在我们的程序中?我误会了什么吗?
谢谢.
我有一个singleThreadExecutor,以执行我按顺序提交给它的任务,即一个接一个的任务,没有并行执行.
我有runnable,这是这样的
MyRunnable implements Runnable {
@Override
public void run() {
try {
Thread.sleep(30000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
Run Code Online (Sandbox Code Playgroud)
}
例如,当我向上述单线程执行程序提交三个MyRunnable实例时,我希望第一个任务执行,因为Thread.sleep在TIMED_WAITING中有执行线程(我可能错误的具体州).其他两个任务不应该分配线程来执行它们,至少在第一个任务完成之前.
所以我的问题是如何通过FutureTask API获取此状态或以某种方式到达执行任务的线程(如果没有这样的线程然后任务等待执行或挂起)并获得其状态或者可能由某些其他方式?
FutureTask只定义了isCanceled()和isDone()方法,但这些方法还不足以描述Task的所有可能的执行状态.
如果我使用 a,ThreadPoolExecutor我有多种构造函数,并且我可以传递/使用我自己的队列作为池的工作队列。
现在我看到 aScheduledThreadPoolExecutor是 a 的子类ThreadPoolExecutor,但构造函数要少得多。
有没有办法使用ScheduledThreadPoolExecutor并且仍然使用我自己的工作队列?
I\xe2\x80\x99m 尝试在 Yarn 客户端上运行 Spark 作业。我有两个节点,每个节点都有以下配置。\n
I\xe2\x80\x99m 得到 \xe2\x80\x9cExecutorLostFailure (执行器 1 丢失)\xe2\x80\x9d。
\n\n我已经尝试了大部分 Spark 调优配置。我已经减少到 1 个执行程序丢失,因为最初我有 6 个执行程序失败。
\n\n这些是我的配置(我的火花提交):
\n\n\n\n\nHADOOP_USER_NAME=hdfs Spark-submit --class genkvs.CreateFieldMappings\n --master 纱线客户端 --driver-内存 11g --executor-内存 11G --total-executor-cores 16 --num-executors 15 --conf " Spark.executor.extraJavaOptions=-XX:+UseCompressedOops\n -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" --conf Spark.akka.frameSize=1000 --conf Spark.shuffle.memoryFraction=1 --conf\n Spark .rdd.compress=true --conf\n spark.core.connection.ack.wait.timeout=800\n my-data/lookup_cache_spark- assembly-1.0-SNAPSHOT.jar -h\n hdfs://hdp-node -1.zone24x7.lk:8020 -p 800
\n
我的数据大小是 6GB,我在工作中执行 groupBy 操作。
\n\ndef process(in: RDD[(String, String, Int, String)]) = {\n in.groupBy(_._4)\n}\nRun Code Online (Sandbox Code Playgroud)\n\n我\xe2\x80\x99m是Spark的新手,请帮我找出我的错误。我\xe2\x80\x99m已经挣扎了至少一周了。
\n\n预先非常感谢您。 …
在spark2.0中,我有两个数据框,我需要首先将它们加入并做一个reduceByKey来聚合数据。我总是在执行器中有OOM。提前致谢。
d1(1G,5亿行,已缓存,由col id2分区)
id1 id2
1 1
1 3
1 4
2 0
2 7
...
Run Code Online (Sandbox Code Playgroud)
d2(160G,200万行,已缓存,由col id2分区,值col包含5000个浮点数的列表)
id2 value
0 [0.1, 0.2, 0.0001, ...]
1 [0.001, 0.7, 0.0002, ...]
...
Run Code Online (Sandbox Code Playgroud)
现在我需要加入两个表以获取d3并使用spark.sql
select d1.id1, d2.value
FROM d1 JOIN d2 ON d1.id2 = d2.id2
Run Code Online (Sandbox Code Playgroud)
然后在d3上执行reduceByKey并汇总表d1中每个id1的值
d4 = d3.rdd.reduceByKey(lambda x, y: numpy.add(x, y)) \
.mapValues(lambda x: (x / numpy.linalg.norm(x, 1)).toList)\
.toDF()
Run Code Online (Sandbox Code Playgroud)
我估计d4的大小为340G。现在我在r3.8xlarge机器上使用以运行作业
mem: 244G
cpu: 64
Disk: 640G
Run Code Online (Sandbox Code Playgroud)
我玩了一些配置,但是执行器中总是有OOM。所以,问题是
是否可以在当前类型的机器上运行此作业?或者我应该只使用更大的机器(多大?)。但是我记得我曾经遇到过一些文章/博客,它们说使用相对较小的机器来进行TB级的处理。
我应该做什么样的改善?例如火花配置,代码优化?
是否可以估计每个执行器所需的内存量?
我尝试过的一些Spark配置
config1:
--verbose
--conf spark.sql.shuffle.partitions=200 …Run Code Online (Sandbox Code Playgroud) 我正在 Spark (v2+) 中寻找一种可靠的方法来以编程方式调整会话中执行器的数量。
我了解动态分配以及在创建会话时配置 Spark 执行器的能力(例如使用--num-executors),但是由于我的 Spark 作业的性质,这些选项对我来说都不是很有用。
该作业对大量数据执行以下步骤:
我很感激我可以将此作业分成两个作业,分别使用不同的 Spark 资源配置文件执行,但我真正想要的是在 Spark 脚本中的特定点(在 Elasticsearch 加载开始之前)以编程方式将执行器的数量设置为 X )。这似乎是一件普遍可以做的有用的事情。
我尝试了一些更改设置,发现了一些可行的方法,但感觉像是一种老套的方式来做一些应该可以以更标准化和受支持的方式完成的事情。
我的尝试(这只是我的尝试):
def getExecutors = spark.sparkContext.getExecutorStorageStatus.toSeq.map(_.blockManagerId).collect {
case bm if !bm.isDriver => bm
}
def reduceExecutors(totalNumber: Int): Unit = {
//TODO throw error if totalNumber is more than current
logger.info(s"""Attempting to reduce number of executors to …Run Code Online (Sandbox Code Playgroud) 我...真的不知道如何更好地说出标题.但基本上,我所拥有的是一个线程池,所有这些线程忙着工作.我希望他们按照分配的顺序报告他们的结果,但同时,我想分批工作.为了说明,并举例说明
ExecutorService exec = Executors.newFixedThreadPool(8);
class MyCallable implements Callable<byte[]> {
private final int threadnumber;
MyCallable(int threadnumber){
this.threadnumber = threadnumber;
}
public byte[] call() {
//does something
}
}
List<Callable<byte[]>> callables = new ArrayList<Callable<byte[]>>();
for(int i=1; i<=20; i++) {
callables.add(new MyCallable(i));
}
try {
List<Future<byte[]>> results = exec.invokeAll(callables);
for(Future<byte[]> result: results) {
System.out.write(result.get(), 0, result.get().length);
}
Run Code Online (Sandbox Code Playgroud)
基本上,池线程有8个线程,我最终有20个任务(这些只是示例).如果我理解正确的话,它现在的工作方式是它等待所有20个任务完成后再按顺序输出它们(从1到20).该程序应该做的是输出连续的字节流(由线程处理,因为我需要保持顺序完整,我使用了未来的接口).虽然我不介意等到完成所有20个任务,但无论如何都要让线程按顺序输出.
如果没有办法,或者我只是完全误解了invokeAll的工作方式,那么澄清也是受欢迎的.提前谢谢!执行官有点混乱,因为我刚刚了解了它们.
随机附录,我甚至可以从一个可调用的函数返回一个字节数组?
Spark架构完全围绕执行器和核心的概念.我想看看在集群中运行的spark应用程序运行了多少执行程序和核心.
我试图在我的应用程序中使用下面的代码段,但没有运气.
val conf = new SparkConf().setAppName("ExecutorTestJob")
val sc = new SparkContext(conf)
conf.get("spark.executor.instances")
conf.get("spark.executor.cores")
Run Code Online (Sandbox Code Playgroud)
有没有办法使用SparkContextObject或SparkConfobject等获取这些值.
executors ×10
java ×6
apache-spark ×4
concurrency ×3
scala ×3
hadoop ×2
threadpool ×2
dynamic ×1
executor ×1
future ×1
futuretask ×1
python ×1