使用在 YARN 集群模式下运行的 spark 2.4.4 和 spark FIFO 调度程序。
我正在使用具有可变线程数的线程池执行程序提交多个 spark 数据帧操作(即将数据写入 S3)。如果我有大约 10 个线程,这可以正常工作,但是如果我使用数百个线程,则会出现死锁,没有根据 Spark UI 安排作业。
哪些因素控制可以同时调度多少作业?驱动程序资源(例如内存/内核)?其他一些火花配置设置?
编辑:
这是我的代码的简要概述
ExecutorService pool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(pool);
Dataset<Row> aHugeDf = spark.read.json(hundredsOfPaths);
List<Future<Void>> futures = listOfSeveralHundredThings
.stream()
.map(aThing -> ecs.submit(() -> {
df
.filter(col("some_column").equalTo(aThing))
.write()
.format("org.apache.hudi")
.options(writeOptions)
.save(outputPathFor(aThing));
return null;
}))
.collect(Collectors.toList());
IntStream.range(0, futures.size()).forEach(i -> ecs.poll(30, TimeUnit.MINUTES));
exec.shutdownNow();
Run Code Online (Sandbox Code Playgroud)
在某些时候,随着nThreads增加,spark 似乎不再安排任何作业,如下所示:
ecs.poll(...) 最终超时nThreads没有正在运行的作业 ID 的正在运行的查询 …假设我想制作一个执行以下操作的Clojure宏:
If x is a list calling the function "bar"
return :foobar
else
return x as a string
Run Code Online (Sandbox Code Playgroud)
但是,bar没有定义; 相反,它只在宏内部使用,如下所示:
(foo (bar))
:foobar
(foo 1)
"1"
Run Code Online (Sandbox Code Playgroud)
人们可以这样做:
(defmacro foo [x]
(if (and (coll? x) (= (first x) 'bar))
:foobar
(str x)))
Run Code Online (Sandbox Code Playgroud)
这适用于(bar)案例以及文字.但是,符号不能按预期工作,给出符号名称而不是其关联值:
user=> (def y 2)
#'user/y
user=> (foo y)
"y"
Run Code Online (Sandbox Code Playgroud)
可以在传递eval函数x之前调用该函数str,但这在使用函数时会导致问题let:
user=> (let [a 3 b (foo a)] b)
java.lang.UnsupportedOperationException: Can't eval locals (NO_SOURCE_FILE:89)
Run Code Online (Sandbox Code Playgroud)
据推测,这个问题与符号解析有关,所以也许我们尝试用syntax-quote来解决问题:
(defmacro foo …Run Code Online (Sandbox Code Playgroud)