为数据的并行处理选择最佳线程数

Ada*_*fin 6 java parallel-processing performance multithreading future

假设我有一个处理100万句话的任务.

对于每个句子,我需要对它做一些事情,无论处理它们的具体顺序如何.

在我的Java程序中,我有一组从我的主要工作块中划分出来的一组未来,它用一个可调用来定义要在一大块句子上完成的工作单元,我正在寻找一种优化线程数量的方法分配工作通过大块的句子,然后重新组合每个线程的所有结果.

在我看到收益递减之前,我可以使用的最大线程数是多少?

另外,是什么原因导致逻辑分配的线程越多,即一次完成的线程越多,就越不正确?

Ric*_*h E 10

在实践中,可能很难找到最佳线程数,甚至每次运行程序时该数字都可能会有所不同.因此,理论上,最佳线程数将是您机器上的核心数.如果您的核心是"超线程"(如英特尔所说),它可以在每个核心上运行2个线程.然后,在这种情况下,最佳线程数是计算机上核心数的两倍.

Also, what causes the logic that the more threads allocated, i.e. 
more being able to be done at once, to be incorrect?
Run Code Online (Sandbox Code Playgroud)

分配更多线程导致同时完成更多工作的原因是错误的,因为只有1个(如果核心是"超线程"的话,只有2个线程)可以在每个核心上一次运行.

所以假设我有一个不是超线程的四核机器.在这种情况下,我可以同时运行最多4个线程.所以,我的最大吞吐量应该用4个线程来实现.假如我尝试在同一设置上运行8个线程.在这种情况下,内核将来回调度这些线程(通过上下文切换),并阻止一个线程以便让另一个线程运行.因此,最多可以一次运行4个线程的工作.

有关这方面的更多信息,使用Linux内核查找"上下文切换"非常有用.这将为您提供有关此主题的所有信息.

另请注意,称为"用户级线程"和"内核级别线程"的线程之间存在差异.如果您进一步研究这个主题,这是一个重要的区别,但它超出了这个问题的范围.

  • 总而言之:当线程100%CPU绑定(意味着它们只使用CPU并且不会通过等待事件来阻塞)时,最好使用与核心数相同的线程数.当线程*将*经历一些阻塞(在你的情况下由于通过网络获取数据),那么拥有比核心数量更多的线程(没有"精确值" - 必须使用实验来实验找到一个最佳范围)因为当一个线程在网络上等待(被阻塞)时,内核可以安排另一个线程工作,依此类推. (2认同)

cru*_*tex 5

您的负载 I/O 是否受限?I/O 限制意味着 CPU 大部分时间都在等待 I/O 操作完成。添加更多线程意味着,向 I/O 子系统或远程服务器等发送更多请求。这可能会产生积极影响,因为对存储的请求可以重新排序和组合(分散收集),但只有在达到最大可能的 I /O 带宽。添加更多线程也可能产生不利影响,例如在传统硬盘上执行更多随机 I/O 请求时。

如果您的负载受 I/O 限制,您可以采用各种方法来优化 I/O 操作。如果可能的话,我的第一个选择是以更大的块和更流式的方式加载数据。下一步是使用外部索引结构或数据库,如果您有很多点访问或更多磁盘,如果只是带宽丢失。无论如何,优化 I/O 是另一个广泛的话题......

您的负载 CPU 是否受限制?这意味着处理 CPU 的能力是限制因素,而不是 I/O 带宽。在这种情况下,优化 I/O 子系统毫无意义,您需要更多或更快的 CPU,并且需要分配负载。

在您的特定情况下,您可以将所有数据加载到内存中,然后您的负载仅受 CPU 限制。对于 CPU 绑定负载,最好使用与机器中 CPU 内核数相同的线程数。选择 CPU 数量作为线程数是相当直接和明显的。它也在问题Optimal number of threads per core 中进行了讨论

实际上,要在 Callable 对象中执行您的任务,请使用以这种方式构造的 ExecutorService:

  int maxThreadCount = Runtime.getRuntime().availableProcessors();
  ExecutorService executor = 
    new ThreadPoolExecutor(
      0, maxThreadCount - 1,
      1, TimeUnit.SECONDS,
      new LinkedBlockingDeque<>(maxThreadCount * 2),
      Executors.defaultThreadFactory(),
      new ThreadPoolExecutor.CallerRunsPolicy());
Run Code Online (Sandbox Code Playgroud)

现在通过添加您的任务进行处理并等待一切完成:

  while (moreToDo) {
    Callable c =...
    executor.submit(c);
  }
  executor.shutdown();
  executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
Run Code Online (Sandbox Code Playgroud)

线程池参数有点棘手。这里有一个详细的解释:

new ThreadPoolExecutor.CallerRunsPolicy()当池中的所有线程都在使用时,通过使用任务生成器线程将停止生成新任务。更准确地说,当达到队列限制时,调用线程也将执行任务。

maxThreadCount - 1: 由于我们也使用调用者线程,线程池大小减一。

new LinkedBlockingDeque<>(maxThreadCount * 2):对于阻塞队列的队列大小,选择一个较小的值,其想法是,通过在队列中放置一些任务,池线程在调用者线程本身处理作业时获得新作业。如果任务在运行时间上非常不规则,这并不完美。ThreadPoolExecutor对于这个用例,应该有一个更清晰的方法。更好的方法是使用 aSnychronosQueue并使提交等待直到线程可用。但是,ThreadPoolExecutor它没有“始终排队”操作模式,相反,它会尝试排队并在当前无法排队时调用 RejectionPolicy。

这应该在您的场景中完成。

当您事先不知道是 CPU 密集型还是 I/O 密集型时,可能会出现负载,并且更复杂的是,负载可能会在处理过程中改变其行为。我解决这个问题的想法是使用类似于TCP 拥塞避免算法中的方法的自适应算法。TCP 中的拥塞避免是完全相同的问题:“我想要最大吞吐量,但我不知道我的资源”。有人做过这方面的工作吗?