我们有一个Web应用程序需要通过http从合作伙伴站点导入10-20个图像.如果我有一个表示我想下载的网址的字符串列表,是否有人建议如何尽快下载它们?
我可以把它们放在for循环中,但是如果有一种简单的方法来并行化它,那么最终用户可能会有好处.我想避免使用直接的Java线程,尽管执行程序框架可能是个好主意.
有任何想法吗?
我正在ThreadPoolExecutor中运行多个任务。我将其初始化如下:
private VideoExportExecutor executor;
private BlockingQueue<Runnable> jobQueue;
public void initialiseVideoProcessor()
{
jobQueue = new LinkedBlockingQueue<Runnable>();
executor = new VideoExportExecutor(1, 1, Long.MAX_VALUE, TimeUnit.SECONDS, jobQueue);
}
Run Code Online (Sandbox Code Playgroud)
我已经实现了runnable(VideoExportThread)的实现,其中包含一种getProgress()跟踪提交任务进度的方法。我提交如下实例:
executor.submit(new VideoExportThread(gcmPath));
Run Code Online (Sandbox Code Playgroud)
我正在查询一种查询executor / blockingQueue的当前/待处理线程的方法。我试图使用jobQueue.toArray()和重写executor方法,beforeExecute(Thread t, Runnable r)但是在两种情况下,返回的runnable都是FutureTask类型,它不包含太多数据。我是否可以使用它来检索我的原始VideoExportThread实例,以识别正在运行的实例并查询其进度?
谢谢
我的应用程序正在积累很多ThreadGC无法接收和清除的实例.从长远来看,这种内存泄漏会使应用程序崩溃.
我不是100%肯定他们来自哪里,但我有一种明显的感觉,以下可能是有问题的代码:
public class UraHostHttpConnection extends AbstractUraHostConnection {
private Handler uiThreadHandler = new Handler(Looper.getMainLooper());
private Executor taskExecutor = new Executor() {
public void execute(Runnable command) {
new Thread(command).start();
}
};
private ConnectionTask task = null;
@Override
public void sendRequest(final HttpUriRequest request) {
this.task = new ConnectionTask();
this.uiThreadHandler.post(new Runnable() {
public void run() {
task.executeOnExecutor(taskExecutor, request);
}
});
}
@Override
public void cancel() {
if (this.task != null)
this.task.cancel(true);
}
}
Run Code Online (Sandbox Code Playgroud)
这段代码允许我并行运行几个HTTP连接,这些连接在默认情况下不会相互阻塞AsyncTask Executor(这只是一个单线程队列). …
我试图找出一种方法来处理多线程设置中的异常.我想并行执行某些任务,每个任务都可能抛出一个我需要做出反应的异常(基本上,通过将失败的任务放回执行队列).但是,实际上从线程中获取异常的唯一方法是创建Future并调用其get()方法.但是,这实际上将调用转换为同步调用.
也许一些代码会说明这一点:
ExecutorService executor = Executors.newFixedThreadPool(nThreads);
Task task = taskQueue.poll(); // let's assume that task implements Runnable
try {
executor.execute(task);
}
catch(Exception ex) {
// record the failed task, so that it can be re-added to the queue
}
Run Code Online (Sandbox Code Playgroud)
但是,在这种情况下,所有任务都会启动,但这里的异常似乎没有被捕获到此catch块中.
另一种方法是使用Future而不是线程并检索其结果:
try {
Future<?> future = executor.submit(task);
future.get();
}
...
Run Code Online (Sandbox Code Playgroud)
在这种情况下,异常会在catch块中被捕获,但代价是必须等到此操作完成.因此,根据需要,任务按顺序执行而不是并行执行.
我错过了什么?如何捕捉每个人的任务例外并对他们做出反应?
请考虑以下Java代码
void doSomething(Runnable r1, Runnable r2){
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(r1);
executor.execute(r2);
}
Run Code Online (Sandbox Code Playgroud)
当我调用doSomething方法时,创建执行程序并依次顺序执行任务r1和r2.
我的问题是:一旦两个任务r1和r2终止会发生什么?
我想执行器对象将被垃圾收集,但我不知道它是否也会被关闭.如果执行程序为其执行创建了一个新线程,该线程是否会导致资源泄漏?
在spark2.0中,我有两个数据框,我需要首先将它们加入并做一个reduceByKey来聚合数据。我总是在执行器中有OOM。提前致谢。
d1(1G,5亿行,已缓存,由col id2分区)
id1 id2
1 1
1 3
1 4
2 0
2 7
...
Run Code Online (Sandbox Code Playgroud)
d2(160G,200万行,已缓存,由col id2分区,值col包含5000个浮点数的列表)
id2 value
0 [0.1, 0.2, 0.0001, ...]
1 [0.001, 0.7, 0.0002, ...]
...
Run Code Online (Sandbox Code Playgroud)
现在我需要加入两个表以获取d3并使用spark.sql
select d1.id1, d2.value
FROM d1 JOIN d2 ON d1.id2 = d2.id2
Run Code Online (Sandbox Code Playgroud)
然后在d3上执行reduceByKey并汇总表d1中每个id1的值
d4 = d3.rdd.reduceByKey(lambda x, y: numpy.add(x, y)) \
.mapValues(lambda x: (x / numpy.linalg.norm(x, 1)).toList)\
.toDF()
Run Code Online (Sandbox Code Playgroud)
我估计d4的大小为340G。现在我在r3.8xlarge机器上使用以运行作业
mem: 244G
cpu: 64
Disk: 640G
Run Code Online (Sandbox Code Playgroud)
我玩了一些配置,但是执行器中总是有OOM。所以,问题是
是否可以在当前类型的机器上运行此作业?或者我应该只使用更大的机器(多大?)。但是我记得我曾经遇到过一些文章/博客,它们说使用相对较小的机器来进行TB级的处理。
我应该做什么样的改善?例如火花配置,代码优化?
是否可以估计每个执行器所需的内存量?
我尝试过的一些Spark配置
config1:
--verbose
--conf spark.sql.shuffle.partitions=200 …Run Code Online (Sandbox Code Playgroud) 我有两个线程,我想确保我在 LinkedBlockingQueue 上正确执行同步。这是正确的吗?或者 (messageToCommsQueue) 上的显式同步不需要吗?
宣言:
private LinkedBlockingQueue<BaseMessage> messagesToCommsQueue;
Run Code Online (Sandbox Code Playgroud)
方法一:
private void startOperationModeStatusMessageExecutor() {
ScheduledExecutorService operationModeStatusExecutor = Executors.newSingleThreadScheduledExecutor();
operationModeStatusExecutor.scheduleAtFixedRate((new Runnable() {
@Override
public void run() {
ModeStatusMessage commsOperateMsg;
commsOperateMsg = MessageFactory.getModeStatusMessage(status.ordinal());
synchronized (messagesToCommsQueue) {
messagesToCommsQueue.add(commsOperateMsg);
}
}
}), 0, 10, TimeUnit.SECONDS);
}
Run Code Online (Sandbox Code Playgroud)
方法二:
Executor commsSenderExecutor = Executors.newSingleThreadExecutor();
commsSenderExecutor.execute(new Runnable() {
@Override
public void run() {
while (getStatus().equals(ModeStatus.INITIATE) || getStatus().equals(ModeStatus.OPERATE)) {
BaseMessage m = null;
try {
synchronized (messagesToCommsQueue) {
m = messagesToCommsQueue.take();
}
} catch (InterruptedException e) {
// TODO …Run Code Online (Sandbox Code Playgroud) 我Executor特意使用框架Executors.newCachedThreadPool();
我有一个Runnable例如100
的列表.前50个,每个创建一个值(存储在列表中)以供最后50个使用.
我想如果我按顺序传递Runnables executor.execute()它们在列表中,它们也将以相同的顺序执行.
但这不会发生.
这些任务似乎是以随机顺序执行的,它们是交错的,不是按顺序执行的.
这是假设工作的方式吗?有办法解决这个问题吗?
谢谢
我知道通常注入器应该在所有应用程序中仅使用一次(在启动时)。但我有以下用例。我为 Executor 注入任务实现,然后在该任务中我有FileHandler每次都必须实例化的依赖项(可以说)。我知道方法是注入提供程序(可以说FileHandlerProvider),每次请求时都会返回新实例。问题是FileHandler它自己有很多依赖项(可以说Parser,OutputPrinter......)。现在,这些每次也需要新的实例(因为实现可能有一些状态,例如计数器,并且在下一个线程运行时重用同一实例将是一个问题)。问题是,注入提供程序后,会重用提供程序的相同实例,因此FileHandler总是使用 sameParser和创建 new OuputPrinter。解决方案可能是再次注入ParserProvider andOutputPrinterProvider而不是 Parser 和OuputPrinterin FileHandlerProvider,但这是不对的,它很快就会变得太复杂,因为有更多的依赖项。我现在看到的唯一简单的解决方案可能是使用 Injector in FileHandlerProvider,它将返回新实例FileHandler(以及依赖项的新实例)。或者在这种情况下也许还有另一种更优雅的解决方案?
我在 YARN 集群 (HDP 2.4) 中使用 Spark,设置如下:
当我使用命令运行我的 spark 应用程序时,spark-submit --num-executors 10 --executor-cores 1 --executor-memory 5g ...Spark 应该为每个执行程序提供5 GB 的 RAM(由于一些开销内存约为 10%,我将内存设置为 5g)。
但是当我查看 Spark UI 时,我看到每个执行程序只有 3.4 GB 的内存,请看截图:
有人可以解释为什么分配的内存这么少吗?
containers executor hadoop-yarn hortonworks-data-platform apache-spark
executor ×10
java ×6
apache-spark ×2
android ×1
containers ×1
download ×1
executors ×1
future ×1
futuretask ×1
guice ×1
hadoop-yarn ×1
provider ×1
task ×1