Python多处理:理解`chunksize`背后的逻辑

Bra*_*mon 47 python parallel-processing multiprocessing python-3.x python-multiprocessing

哪些因素决定了chunksize方法的最佳参数multiprocessing.Pool.map()?该.map()方法似乎使用任意启发式作为其默认的chunksize(如下所述); 是什么推动了这种选择,是否有基于某些特定情况/设置的更周到的方法?

示例 - 说我是:

  • 传递iterable.map()拥有约1500万个元素的元素;
  • 24个核的机器上工作,使用默认processes = os.cpu_count()multiprocessing.Pool().

我天真的想法是给每24个工人一个同样大小的块,即15_000_000 / 24625,000.大块应该在充分利用所有工人的同时减少营业额/管理费用.但似乎缺少给每个工人提供大批量的一些潜在缺点.这是不完整的图片,我错过了什么?


我的部分问题源于if chunksize=None:both .map().starmap()call 的默认逻辑,.map_async()如下所示:

def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
               error_callback=None):
    # ... (materialize `iterable` to list if it's an iterator)
    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)  # ????
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0
Run Code Online (Sandbox Code Playgroud)

背后的逻辑是divmod(len(iterable), len(self._pool) * 4)什么?这意味着chunksize将更接近15_000_000 / (24 * 4) == 156_250.乘以len(self._pool)4 的意图是什么?

这使得得到的chunksize 比我上面的"天真逻辑" 4倍,其中包括将iterable的长度除以in中的worker数pool._pool.

最后,还有来自Python文档的这个片段.imap(),进一步激发了我的好奇心:

chunksize参数与map() 方法使用的参数相同.对于使用了一个较大的值很长iterables chunksize可以使工作完成多少不是使用默认值1速度更快.


相关答案虽然有用,但有点过于高级:Python多处理:为什么较大的chunksize较慢?.

Dar*_*aut 92

简答

Pool的chunksize-algorithm是一种启发式算法.它为您尝试填充Pool方法的所有可想象的问题场景提供了一个简单的解决方案.因此,无法针对任何特定方案进行优化.

该算法任意地将可迭代的块分成大约比原始方法多四倍的块.更多的块意味着更多的开销,但增加了调度灵活性.这个答案将如何表明,这会导致平均较高的工人利用率,但不能保证每个案例的总计算时间更短.

"很高兴知道"你可能会想,"但是如何知道这对我的具体多处理问题有帮助?" 嗯,事实并非如此.更诚实的简短回答是,"没有简短的答案","多处理是复杂的"和"它取决于".观察到的症状可能有不同的根源,即使是类似的情况.

这个答案试图为您提供基本概念,帮助您更清楚地了解Pool的调度黑匣子.它还试图为您提供一些基本工具,用于识别和避免潜在的悬崖,因为它们与块状结构有关.

关于这个答案

这个答案正在进行中.

仅举几例:

  • 总结部分
  • 改进(短)可读性的措施

如果你不,现在读它,我建议只跳过它得到了什么样的期待感,但推迟通过它的工作,直到这些行已被删除.

最近更新(2月21日):

  • 将第7章外包成单独的答案,因为我对30000个字符限制感到惊讶
  • 在第7章中添加了两个GIF,显示了Pool和天真的chunksize算法

有必要首先澄清一些重要的术语.


1.定义


这里的块是iterable池方法调用中指定的-argument的一部分.如何计算chunksize以及它可能产生的影响,是这个答案的主题.


任务

在数据方面,任务在工作进程中的物理表示可以在下图中看到.

figure0

该图显示了一个示例调用pool.map(),沿着一行代码显示,从multiprocessing.pool.worker函数中获取,其中从inqueuegets中读取的任务被解压缩.workerMainThreadpool-worker-process中的底层main-function .该func池中法规定-argument只会匹配的func内部-variable worker-function单呼的方法,如 apply_asyncimapchunksize=1.对于具有chunksize-parameter 的其余pool方法,处理函数func将是mapper函数(mapstar或starmapstar).此函数将用户指定的mapstar参数映射到传输的可迭代块( - >"map-tasks")的每个元素上.这需要时间,定义任务也作为一个工作单位.


Taskel

虽然对于一个块的整个处理使用"任务"一词是由内部的代码匹配的starmapstar,但是没有指示如何对用户指定的单个调用func(块的一个元素作为参数)应该是提到.为了避免出现命名冲突引起的混淆(想想multiprocessing.poolPool -'s-method中的参数func),这个答案将把任务中的单个工作单元称为taskel.

taskel(从任务+ EL EMENT)是一种内工作的最小单位的任务.它是使用maxtasksperchild-merameter -parameter 指定的函数的单次执行__init__,使用从传输的的单个元素获得的参数调用.一个任务taskels.func


并行化开销(PO)

PO由Python内部开销和进程间通信(IPC)的开销组成.Python中的每任务开销带有打包和解包任务及其结果所需的代码.IPC开销伴随着线程的必要同步以及不同地址空间之间的数据复制(需要两个复制步骤:parent - > queue - > child).IPC开销的数量取决于操作系统,硬件和数据大小,这使得对影响的概括变得困难.


2.并行化目标

使用多处理时,我们的总体目标(显然)是最小化所有任务的总处理时间.为实现这一总体目标,我们的技术目标需要优化硬件资源的利用率.

Some important sub-goals for achieving the technical goal are:

  • minimize parallelization overhead (most famously, but not alone: IPC)
  • high utilization across all cpu-cores
  • keeping memory usage limited to prevent the OS from excessive paging (trashing)

At first, the tasks need to be computationally heavy (intensive) enough, to earn back the PO we have to pay for parallelization. The relevance of PO decreases with increasing absolute computation time per taskel. Or, to put it the other way around, the bigger the absolute computation time per taskel for your problem, the less relevant gets the need for reducing PO. If your computation will take hours per taskel, the IPC overhead will be negligible in comparison. The primary concern here is to prevent idling worker processes after all tasks have been distributed. Keeping all cores loaded means, we are parallelizing as much as possible.


3. Parallelization Scenarios

What factors determine an optimal chunksize argument to methods like multiprocessing.Pool.map()

The major factor in question is how much computation time may vary across our single taskels. To name it, the choice for an optimal chunksize is determined by the...

Coefficient of Variation (CV) for computation times per taskel.

The two extreme scenarios on a scale, following from the extent of this variation are:

  1. All taskels need exactly the same computation time
  2. A taskel could take seconds or days to finish

For better memorability I will refer to these scenarios as:

  1. Dense Scenario
  2. Wide Scenario

Another determining factor is the number of used worker-processes (backed by cpu-cores). It will become clear later why.


Dense Scenario

In a Dense Scenario it would be desirable to distribute all taskels at once, to keep necessary IPC and context switching at a minimum. This means we want to create only as much chunks, as much worker processes there are. How already stated above, the weight of PO increases with smaller computation times per taskel.

For maximal throughput, we also want all worker processes busy until all tasks are processed (no idling workers). For this goal, the distributed chunks should be of equal size or close to.


Wide Scenario

The prime example for a Wide Scenario would be an optimization problem, where results either converge quickly or computation can take hours, if not days. It's not predictable what mixture of "light taskels" and "heavy taskels" a task will contain in such a case, hence it's not advisable to distribute too many taskels in a task-batch at once. Distributing less taskels at once than possible, means increasing scheduling flexibility. This is needed here to reach our sub-goal of high utilization of all cores.

If Pool methods, by default, would be totally optimized for the Dense Scenario, they would increasingly create suboptimal timings for every problem located closer to the Wide Scenario.


4. Risks of Chunksize > 1

Consider this simplified pseudo-code example of a Wide Scenario-iterable, which we want to pass into a pool-method:

good_luck_iterable = [60, 60, 86400, 60, 86400, 60, 60, 84600]
Run Code Online (Sandbox Code Playgroud)

Instead of the actual values, we pretend to see the needed computation time in seconds, for simplicity only 1 minute or 1 day. We assume the pool has four worker processes (on four cores) and chunksize is set to Pool. Because the order will be kept, the chunks send to the workers will be these:

[(60, 60), (86400, 60), (86400, 60), (60, 84600)]
Run Code Online (Sandbox Code Playgroud)

Since we have enough workers and the computation time is high enough, we can say, that every worker process will get a chunk to work on in the first place. (This does not have to be the case for fast completing tasks). Further we can say, the whole processing will take about 86400+60 seconds, because that's the highest total computation time for a chunk in this artificial scenario and we distribute chunks only once.

Now consider this iterable, which only has one position switched compared to the one before:

bad_luck_iterable = [60, 60, 86400, 86400, 60, 60, 60, 84600]
Run Code Online (Sandbox Code Playgroud)

...and the corresponding chunks:

[(60, 60), (86400, 86400), (60, 60), (60, 84600)]
Run Code Online (Sandbox Code Playgroud)

Just bad luck with the sorting of our iterable nearly doubled (86400+86400) our total processing time! The worker getting the vicious (86400, 86400)-chunk is blocking the second heavy taskel in its task from getting distributed to one of the idling workers already finished with their (60, 60)-chunks. We obviously would not risk such an unpleasant outcome if we set chunksize.

This is the risk of bigger chunksizes. With higher chunksizes we trade scheduling flexibility for less overhead and in cases like above, that's a bad deal.

How we will see in chapter 6. Quantifying Algorithm Efficiency, bigger chunksizes can also lead to suboptimal results for Dense Scenarios.


5. Pool's Chunksize Algorithm

Below you will find a slightly modified version of the algorithm inside the source code. As you can see, I cut off the lower part and wrapped it into a function for calculating the 2 argument externally. I also replaced chunksize=1 with a chunksize parameter and outsourced the 4 calls.

# mp_utils.py

def calc_chunksize(n_workers, len_iterable, factor=4):
    """Calculate chunksize argument for Pool-methods.

    Resembles source-code within `multiprocessing.pool.Pool._map_async`.
    """
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    return chunksize
Run Code Online (Sandbox Code Playgroud)

To ensure we are all on the same page, here's what factor does:

len() is a builtin function which returns divmod. divmod(x, y) is the floor division, returning the down rounded quotient from (x//y, x%y), while x // y is the modulo operation returning the remainder from x / y. Hence e.g. x % y returns x / y.

Now when you look at divmod(10, 3), you will notice (3, 1) here is the divisor chunksize, extra = divmod(len_iterable, n_workers * 4) in n_workers and multiplication by y , without further adjustment through x / y later on, leads to an initial chunksize at least four times smaller (for 4) than it would be otherwise.

For viewing the effect of multiplication by 4 on the intermediate chunksize result consider this function:

def compare_chunksizes(len_iterable, n_workers=4):
    """Calculate naive chunksize, Pool's stage-1 chunksize and the chunksize
    for Pool's complete algorithm. Return chunksizes and the real factors by
    which naive chunksizes are bigger.
    """
    cs_naive = len_iterable // n_workers or 1  # naive approach
    cs_pool1 = len_iterable // (n_workers * 4) or 1  # incomplete pool algo.
    cs_pool2 = calc_chunksize(n_workers, len_iterable)

    real_factor_pool1 = cs_naive / cs_pool1
    real_factor_pool2 = cs_naive / cs_pool2

    return cs_naive, cs_pool1, cs_pool2, real_factor_pool1, real_factor_pool2
Run Code Online (Sandbox Code Playgroud)

The function above calculates the naive chunksize (if extra: chunksize +=1) and the first-step chunksize of Pool's chunksize-algorithm (len_iterable >= n_workers * 4), as well as the chunksize for the complete Pool-algorithm (4). Further it calculates the real factors cs_naive and cs_pool1, which tell us how many times the naively calculated chunksizes are bigger than Pool's internal version(s).

Below you see two figures created with output from this function. The left figure just shows the chunksizes for cs_pool2 up until an iterable length of rf_pool1 = cs_naive / cs_pool1. The right figure shows the values for rf_pool2 = cs_naive / cs_pool2. For iterable length n_workers=4, the real factor becomes 500(for rf_pool1) and it's maximum value is 16 for iterable lengths >=4. That's an massive deviation from the original factor len_iterable >= n_workers * 4 the algorithm converges to for longer iterables. 'Longer' here is relative and depends on the number of specified workers.

图1

Remember chunksize 7 still lacks the 28-31-adjustment with the remainder from 4 contained in cs_pool1 from the complete algorithm.

The algorithm goes on with:

if extra:
    chunksize += 1
Run Code Online (Sandbox Code Playgroud)

Now in cases were there is a remainder (an extra from the divmod-operation), increasing the chunksize by 1 obviously cannot work out for every task. After all, if it would, there would not be a remainder to begin with.

How you can see in the figures below, the "extra-treatment" has the effect, that the real factor for divmod now converges towards cs_pool2 from below extra and the deviation is somewhat smoother. Standard deviation for rf_pool2 and 4 drops from 4 for n_workers=4 to len_iterable=500 for 0.5233.

figure2

Eventually, increasing rf_pool1 by 1 has the effect, that the last task transmitted only has a size of 0.4115.

The more interesting and how we will see later, more consequential, effect of the extra-treatment however can be observed for the number of generated chunks (rf_pool2). For long enough iterables, Pool's completed chunksize-algorithm (chunksize in the figure below) will stabilize the number of chunks at len_iterable % chunksize or chunksize. In contrast, the naive algorithm (after an initial burp) keeps alternating between n_chunks and n_pool2 as the length of the iterable grows.

figure3

Below you will find two enhanced info-functions for Pool's and the naive chunksize-algorithm. The output of this functions will be needed in the next chapter.

# mp_utils.py

from collections import namedtuple


Chunkinfo = namedtuple(
    'Chunkinfo', ['n_workers', 'len_iterable', 'n_chunks',
                  'chunksize', 'last_chunk']
)

def calc_chunksize_info(n_workers, len_iterable, factor=4):
    """Calculate chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    # `+ (len_iterable % chunksize > 0)` exploits that `True == 1`
    n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
    # exploit `0 == False`
    last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )
Run Code Online (Sandbox Code Playgroud)

Don't be confused by the probably unexpected look of n_chunks == n_workers * 4. The n_chunks == n_workers from n_chunks == n_workers + 1 is not used for calculating the chunksize.

def calc_naive_chunksize_info(n_workers, len_iterable):
    """Calculate naive chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers)
    if chunksize == 0:
        chunksize = 1
        n_chunks = extra
        last_chunk = chunksize
    else:
        n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
        last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )
Run Code Online (Sandbox Code Playgroud)

6. Quantifying Algorithm Efficiency

Now, after we have seen how the output of calc_naive_chunksize_info's chunksize-algorithm looks different compared to output from the naive algorithm...

  • How to tell if Pool's approach actually improves something?
  • And what exactly could this something be?

As shown in the previous chapter, for longer iterables (a bigger number of taskels), Pool's chunksize-algorithm approximately divides the iterable into four times more chunks than the naive method. Smaller chunks mean more tasks and more tasks mean more Parallelization Overhead (PO), a cost which must be weighed against the benefit of increased scheduling-flexibility (recall "Risks of Chunksize>1").

For rather obvious reasons, Pool's basic chunksize-algorithm cannot weigh scheduling-flexibility against PO for you. IPC-overhead is OS-, hardware- and data-size dependent. The algorithm cannot know on what hardware we run our code, nor does it have a clue how long a taskel will take to finish. It's a heuristic providing basic functionality for all possible scenarios. This means it cannot be optimized for any scenario in particular. As mentioned before, PO also becomes increasingly less of a concern with increasing computation times per taskel (negative correlation).

When you recall the Parallelization Goals from chapter 2, one bullet-point was:

  • high utilization across all cpu-cores

A repeating question on SO regarding extra is asked by people wondering about unused cores/idling worker-processes in situations where you would expect all worker-processes busy.

Idling worker-processes towards the end of our computation is an observation we can make even with Dense Scenarios (totally equal computation times per taskel) in cases where the number of workers is not a divisor of the number of chunks (divmod).

A bigger number of chunks means an increased chance, that the number of workers will be a divisor for Pool, hence the chance of not observing idling workers improves accordingly.

For reasons mentioned before, the PO aspect completely stays out of scope from theoretical considerations about measuring algorithm efficiency, at least in an initial step. The previously mentioned something, Pool's chunksize-algorithm can try to improve is the minimization of idling worker-processes, respectively the utilization of cpu-cores.

The value quantifying the rate of worker-utilization, I'm going to refer to as:

Parallelization Efficiency (PE)

Our original condition for a formalization of the problem, the stable state, is a Dense Scenario with totally equal computation times per taskel. Every other scenario would be random/chaos and not suited for a ceteris paribus investigation. Further chaos factors like OS thread-scheduling policy are also not taken into consideration.

It's important to note, that PE, in the sense I'm using the term here, does not automatically correlate with faster overall computation for a given parallelization problem. PE doesn't tell us if the workers are productively busy, or if most of the time is wasted in handling overhead. It only tells us the percentage of worker utilization in the meaning of an absence of idling workers - and it does so only for our simplified model.


6.1 Absolute Parallelization Efficiency (APE)

While thinking about how I would actually be able to quantify a possible advantage of Pool's chunksize-algorithm over the naive chunksize-algorithm, I imagined a picture of Pool's worker-scheduling like you see it below.

figure4

  • The x-axis is sectioned into equal units of time, where each unit stands for the computation time a taskel requires.
  • The y-axis is divided into the number of worker-processes the pool uses.
  • A taskel here is displayed as the smallest cyan-colored rectangle, put into a timeline (a schedule) of an anonymized worker-process.
  • A task is one or multiple taskels in a worker-timeline continuously highlighted with the same hue.
  • Idling time units are represented through red colored tiles.
  • The Parallel Schedule is partitioned into sections. The last section is the tail-section.

The names for the composed parts can be seen in the picture below.

figure5

Parallelization Efficiency then gets calculated by dividing the Busy Share through the whole potential:

Absolute Parallelization Efficiency (APE) = Busy Share/Parallel Schedule

Here is how this looks in code:

# mp_utils.py

def calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
    """Calculate Absolute Distribution Efficiency (ADE).

    `len_iterable` is not used, but contained to keep a consistent signature
    with `calc_rde`.
    """
    if n_workers == 1:
        return 1

    potential = (
        ((n_chunks // n_workers + (n_chunks % n_workers > 1)) * chunksize)
        + (n_chunks % n_workers == 1) * last_chunk
    ) * n_workers

    n_full_chunks = n_chunks - (chunksize > last_chunk)
    taskels_in_regular_chunks = n_full_chunks * chunksize
    real = taskels_in_regular_chunks + (chunksize > last_chunk) * last_chunk
    ade = real / potential

    return ade
Run Code Online (Sandbox Code Playgroud)

If there is no Idling Share, Busy Share will be equal to Parallel Schedule, hence we get an APE of 100%. In our simplified model, this is a scenario where all available processes will be busy through the whole time needed for processing all tasks. In other words, the whole job gets effectively parallelized to 100 percent.

But why do I keep referring to PE as absolute PE here?

To comprehend that, we have to consider a possible case for the chunksize (cs) which ensures maximal scheduling flexibility (also, the number of Highlanders there can be. Coincidence?):

___________________________________~ ONE ~___________________________________

If we, for example, have four worker-processes and 37 taskels, there will be idling workers even with multiprocessing.Pool, just because __CODE__

  • 我在SO上看到的最史诗般的答案之一。 (14认同)
  • 第一个答案是在 SO 上看到的目录。 (3认同)
  • 哦,这是您的简短答案:P (2认同)
  • @L.Iridium 不客气!我确实在可能的情况下使用了 [matplotlib](https://matplotlib.org/#),否则...... LibreOffice calc + Pinta(基本图像编辑)。是的,我知道......但它确实有效。;) (2认同)

Dar*_*aut 23

关于这个答案

该答案是上面接受的答案的第二部分。


7.天真vs. Pool的块大小算法

在详细介绍之前,请考虑以下两个gif。对于不同iterable长度的范围,它们显示了两个比较的算法如何对传递的数据进行分块iterable(届时将是一个序列)以及如何分配结果任务。工作人员的顺序是随机的,实际上,每个工作人员的分布式任务数量可能与此图像不同,在轻型任务组和/或广泛场景中的任务组中。如前所述,此处也不包括开销。但是,对于密集场景中足够笨重的任务组,传输数据大小可以忽略不计,实际计算得出的结果非常相似。

cs_4_50

cs_200_250

如第一章“显示5.池的CHUNKSIZE算法 ”,配有游泳池的CHUNKSIZE算法块的数量将在稳定n_chunks == n_workers * 4的足够大iterables,同时保持之间进行切换n_chunks == n_workers,并n_chunks == n_workers + 1与幼稚的做法。对于天真的算法适用:因为n_chunks % n_workers == 1Truen_chunks == n_workers + 1一个新的部分,将创建只有一个工人将被采用。

朴素的块大小算法:

您可能会认为您是在相同数量的工作人员中创建任务的,但这仅适用于没有余数的情况len_iterable / n_workers。如果余数,就会有一个新的部分只有一个单个工人的任务。到那时,您的计算将不再并行。

在下面,您会看到一个类似于第5章中显示的图,但显示的是部分的数目而不是块的数目。对于Pool的完整chunksize-algorithm(n_pool2),n_sections将稳定在臭名昭著的硬编码因子上4。对于朴素的算法,n_sections将在一与二之间交替。

图10

对于Pool的chunksize算法,n_chunks = n_workers * 4通过前面提到的额外处理进行的稳定,阻止在此处创建新的部分,并将闲置份额限制为一个工人拥有足够长的可迭代项。不仅如此,该算法还将使Idling Share的相对大小不断缩小,从而导致RDE值收敛至100%。

“足够长”的n_workers=4len_iterable=210例如。对于等于或大于此值的可迭代项,空闲份额将限制为一个工作者,该特征最初是由于4首先在chunksize算法中使用-乘法而丢失的。

图11

天真的chunksize-算法也收敛到100%,但它的速度变慢了。会聚效果完全取决于以下事实:在有两个部分的情况下,尾巴的相对部分会收缩。只有一名受雇工人的尾巴仅限于x轴长度n_workers - 1,可能的最大余数为len_iterable / n_workers

天真和Pool的chunksize-algorithm的实际RDE值有何不同?

在下面,您可以找到两个热图,其中显示了从2到100的所有工人的所有可迭代长度(最多5000)的RDE值。色标从0.5到1(50%-100%)。您会在左侧的热图中注意到更多朴素算法的暗区域(较低的RDE值)。相比之下,Pool右边的chunksize-algorithm则绘制出更多阳光照耀的画面。

图12

左下暗角与右上亮角的对角线梯度再次显示了依赖于工人人数的“长迭代”。

每种算法有多糟糕?

使用Pool的chunksize-algorithm,RDE值为81.25%是上面指定的worker范围和可迭代长度的最小值:

图13

使用幼稚的chunksize算法,情况可能会变得更糟。此处计算出的最低RDE为50.72%。在这种情况下,几乎只有一个工人在运行一半的计算时间!因此,当心骑士降落的骄傲所有者。;)

图14


8.现实检查

在前面的章节中,我们考虑了纯数学分布问题的简化模型,去除了细节问题,这些细节首先使多重处理成为一个棘手的话题。为了更好地理解单独的分布模型(DM)可以在多大程度上有助于解释实际观察到的工人利用率,我们现在来看看实际计算得出的并行调度。

设定

下面的图表都处理了一个简单的,受CPU约束的伪函数的并行执行,该伪函数使用各种参数调用,因此我们可以观察绘制的并行调度如何随输入值的变化而变化。此函数中的“工作”仅包括范围对象上的迭代。由于我们传入了大量数字,这已经足以使核心繁忙。可选地,该函数需要一些taskel-unique附加项data,而这些附加项将保持不变。由于每个任务组都包含完全相同的工作量,因此我们在这里仍在处理密集场景。

该函数由包装器装饰,该包装器以ns-resolution(Python 3.7+)时间戳记。时间戳用于计算任务组的时间跨度,因此可以绘制经验性的并行计划。

@stamp_taskel
def busy_foo(i, it, data=None):
    """Dummy function for CPU-bound work."""
    for _ in range(int(it)):
        pass
    return i, data


def stamp_taskel(func):
    """Decorator for taking timestamps on start and end of decorated
    function execution.
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time_ns()
        result = func(*args, **kwargs)
        end_time = time_ns()
        return (current_process().name, (start_time, end_time)), result
    return wrapper
Run Code Online (Sandbox Code Playgroud)

Pool的starmap方法也以仅对starmap调用本身计时的方式进行修饰。此调用的“开始”和“结束”确定所产生的并行计划的x轴上的最小值和最大值。

我们将在具有以下规格的计算机上的四个工作进程上观察40个任务组的计算:Python 3.7.1,Ubuntu 18.04.2,Intel®Core™i7-2600K CPU @ 3.40GHz×8

将变化的输入值是for循环中的迭代次数(30k,30M,600M)和附加发送的数据大小(每个taskel,numpy-ndarray:0 MiB,50 MiB)。

...
N_WORKERS = 4
LEN_ITERABLE = 40
ITERATIONS = 30e3  # 30e6, 600e6
DATA_MiB = 0  # 50

iterable = [
    # extra created data per taskel
    (i, ITERATIONS, np.arange(int(DATA_MiB * 2**20 / 8)))  # taskel args
    for i in range(LEN_ITERABLE)
]


with Pool(N_WORKERS) as pool:
    results = pool.starmap(busy_foo, iterable)
Run Code Online (Sandbox Code Playgroud)

下面显示的运行是经过手工挑选的,具有相同的块顺序,因此与“分配模型”中的“并行计划”相比,您可以更好地发现差异,但是请不要忘记工人获得任务的顺序是不确定的。

DM预测

重申一下,分布模型“预测”了并行调度,就像我们在6.2章中已经看到的那样:

图15

第一次运行:每个Taskel 30k迭代&0 MiB数据

图16

我们在这里的第一次跑步很短,而任务车非常轻。整个pool.starmap()通话总共只花了14.5毫秒。您会注意到,与DM相反,空转不仅仅限于尾部,而是在任务之间甚至任务板之间进行。那是因为我们这里的实际日程安排自然包括各种开销。在这里空转意味着任务栏之外的所有内容。Taskel 期间可能发生的实际空转未捕获,如前所述。

您还可以看到,并非所有工作人员都同时完成任务。这是由于以下事实:所有工作人员都被共享共享inqueue,一次只能读取一个工作人员。同样适用于outqueue。一旦传输非边际大小的数据,这可能会导致更大的麻烦,我们稍后会看到。

此外,您可以看到,尽管每个任务组都包含相同的工作量,但任务组的实际测量时间跨度却相差很大。分配给worker-3和worker-4的任务需要比前两个worker处理的任务更多的时间。对于本次运行,我怀疑这是由于在那时worker-3 / 4的内核上不再提供Turbo Boost,因此他们以较低的时钟速率处理任务。

整个计算非常轻巧,以至于硬件或操作系统引入的混乱因素会严重扭曲PS。该计算是“随风而逝”,而DM预测即使在理论上合适的情况下也没有什么意义。

第2次运行:每个Taskel 30M迭代和0 MiB数据

图17

将for循环中的迭代次数从30,000增加到3,000万,将导致真正的并行调度,这与DM提供的数据所预测的调度非常接近!现在,每个任务的计算量足够大,足以在开始时和中间将边缘的空闲部分边缘化,仅显示DM预测的较大的空闲份额。

第三次运行:每个Taskel 30M迭代和50 MiB数据

图18

保留30M次迭代,但另外每任务任务发送50 MiB来回扭曲图像。在这里,排队效果很明显。工人4需要等待的时间比工人1更长。现在想象一下有70名工人的时间表!

如果任务组在计算上非常轻巧,但可以提供大量数据作为有效负载,则单个共享队列的瓶颈可能会阻止向池中添加更多工作器的任何其他好处,即使它们由物理核心支持也是如此。在这种情况下,Worker-1可以完成其第一项任务,甚至在Worker-40达到其第一项任务之前就等待一个新任务。

现在应该变得很明显了,为什么计算时间Pool并不总是随工作人员的数量线性减少。发送相对大量的数据可能导致以下情况:大部分时间都花在等待将数据复制到工作人员的地址空间上,并且一次只能喂养一个工作人员。

第4次运行:每个Taskel有600M迭代和50 MiB数据

图19

在这里,我们再次发送50 MiB,但是迭代次数从30M增加到600M,这使总计算时间从10 s增加到152 s。再次绘制的并行计划与预测的并行计划非常接近,通过数据复制产生的开销被边缘化了。


9.结论

所讨论的乘法4增加了调度灵活性,但也利用了taskel分布的不均匀性。如果没有这种乘积,则即使对于短可迭代对象,空闲份额也将限于单个工人(对于具有密集场景的DM)。池的块大小算法需要输入可迭代项一定大小才能恢复该特性。

正如该答案所希望显示的那样,与朴素的方法相比,Pool的chunksize-algorithm平均导致更好的核心利用率,至少对于一般情况而言,并且不考虑较长的开销。朴素的算法在这里的分布效率(DE)可以低至约51%,而Pool的块大小算法的分布效率低至约81%。但是,DE不像IPC那样包含并行化开销(PO)。第8章已经表明,对于开销很小的密集场景,DE仍然具有很好的预测能力。

尽管Pool的chunksize-algorithm 与天真的方法相比获得了更高的DE但它并没有为每个输入星座图提供最佳的taskel分布。尽管简单的静态分块算法无法优化(包括开销)并行效率(PE),但没有内在的理由无法始终提供100%的相对分配效率(RDE),这意味着与DE相同与chunksize=1。一个简单的chunksize-algorithm仅包含基本数学,并且可以通过任何方式自由地“切片”。

与Pool的“等分块”算法的实现不同,“等分块”算法将为每个/ 组合提供100%的RDE。偶数大小的算法在Pool的源代码中实现起来会稍微复杂一些,但是可以通过将外部任务打包而在现有算法的基础上进行调制(如果我将Q / A放在怎么做)。len_iterablen_workers


小智 5

我认为您缺少的部分是您的幼稚估计假设每个工作单元花费相同的时间,在这种情况下,您的策略将是最好的。但是,如果某些作业比其他作业更快完成,则某些内核可能会变得空闲,以等待缓慢的作业完成。

因此,通过将这些块分解成4倍以上的块,然后,如果一个块提早完成,则该内核可以启动下一个块(而其他内核继续在其较慢的块上工作)。

我不知道他们为什么要精确选择4倍,但是要在最小化地图代码的开销(需要最大的块)和平衡不同时间段的块(这需要最小的块)之间进行权衡)。