为什么这些goroutine不能通过更多的并发执行来扩展它们的性能?

bat*_*ast 5 parallel-processing concurrency optimization performance go

背景

我目前正在研究我的学士论文,基本上我的任务是优化Go中的给定代码,即尽可能快地运行.首先,我优化了串行功能,然后尝试通过goroutines引入并行性.在互联网上进行研究之后,由于来自talks.golang的以下幻灯片,我现在理解了并发性和并行性之间的区别.我参观了一些并行编程课程,我们在pthread/openmp的帮助下并行化了ac/c ++代码,因此我尝试在Go中应用这些范例.也就是说,在这个特殊情况下,我正在优化一个函数,它计算一个长度的切片的移动平均值len:=n+(window_size-1)(它等于9393或10175),因此我们有n窗口,我们计算相应的算术平均值,并在输出中正确保存切片.

请注意,此任务本质上是令人尴尬的并行.

我的优化尝试和结果

moving_avg_concurrent2我将切片分成num_goroutines小片并用一个goroutine运行.这个功能是用一个goroutine执行的,出于某种原因(但是我们在这里找不到原因,但是我们在这里得到了切线),moving_avg_serial4但是比一个以上的goroutine它开始表现更差moving_avg_serial4.
moving_avg_concurrent3我所采用的主/工人模式.性能比moving_avg_serial4使用一个goroutine时差.在这里,我们至少在增加时获得了更好的表现,num_goroutines但仍然没有更好moving_avg_serial4.为了比较的性能moving_avg_serial4,moving_avg_concurrent2moving_avg_concurrent3我写了一个标杆和我列出的结果:

fct & num_goroutines | timing in ns/op | percentage  
---------------------------------------------------------------------   
          serial4    |         4357893 |   100.00%  
          concur2_1  |         5174818 |   118.75%  
          concur2_4  |         9986386 |   229.16%  
          concur2_8  |        18973443 |   435.38%  
          concur2_32 |        75602438 |  1734.84%  
          concur3_1  |        32423150 |   744.01%  
          concur3_4  |        21083897 |   483.81%  
          concur3_8  |        16427430 |   376.96%  
          concur3_32 |        15157314 |   347.81%  
Run Code Online (Sandbox Code Playgroud)

因为如上所述,这个问题令人尴尬地平行,我期待看到性能的巨大提升但事实并非如此.

为什么不进行moving_avg_concurrent2扩展呢?
为什么moving_avg_concurrent3那么慢moving_avg_serial4
我知道goroutines很便宜但仍然不是免费的,但这是否有可能产生这么多开销,以至于我们甚至比它还要慢moving_avg_serial4

功能:

// returns a slice containing the moving average of the input (given, i.e. not optimised)
func moving_avg_serial(input []float64, window_size int) []float64 {
    first_time := true
    var output = make([]float64, len(input))
    if len(input) > 0 {
        var buffer = make([]float64, window_size)
        // initialise buffer with NaN
        for i := range buffer {
            buffer[i] = math.NaN()
        }
        for i, val := range input {
            old_val := buffer[int((math.Mod(float64(i), float64(window_size))))]
            buffer[int((math.Mod(float64(i), float64(window_size))))] = val
            if !NaN_in_slice(buffer) && first_time {
                sum := 0.0
                for _, entry := range buffer {
                    sum += entry
                }
                output[i] = sum / float64(window_size)
                first_time = false
            } else if i > 0 && !math.IsNaN(output[i-1]) && !NaN_in_slice(buffer) {
                output[i] = output[i-1] + (val-old_val)/float64(window_size) // solution without loop
            } else {
                output[i] = math.NaN()
            }
        }
    } else { // empty input
        fmt.Println("moving_avg is panicking!")
        panic(fmt.Sprintf("%v", input))
    }
    return output
}

// returns a slice containing the moving average of the input
// reordering the control structures to exploid the short-circuit evaluation
func moving_avg_serial4(input []float64, window_size int) []float64 {
    first_time := true
    var output = make([]float64, len(input))
    if len(input) > 0 {
        var buffer = make([]float64, window_size)
        // initialise buffer with NaN
        for i := range buffer {
            buffer[i] = math.NaN()
        }
        for i := range input {
            //            fmt.Printf("in mvg_avg4: i=%v\n", i)
            old_val := buffer[int((math.Mod(float64(i), float64(window_size))))]
            buffer[int((math.Mod(float64(i), float64(window_size))))] = input[i]
            if first_time && !NaN_in_slice(buffer) {
                sum := 0.0
                for j := range buffer {
                    sum += buffer[j]
                }
                output[i] = sum / float64(window_size)
                first_time = false
            } else if i > 0 && !math.IsNaN(output[i-1]) /* && !NaN_in_slice(buffer)*/ {
                output[i] = output[i-1] + (input[i]-old_val)/float64(window_size) // solution without loop
            } else {
                output[i] = math.NaN()
            }
        }
    } else { // empty input
        fmt.Println("moving_avg is panicking!")
        panic(fmt.Sprintf("%v", input))
    }
    return output
}

// returns a slice containing the moving average of the input
// splitting up slice into smaller pieces for the goroutines but without using the serial version, i.e. we only have NaN's in the beginning, thus hope to reduce some overhead
// still does not scale (decreasing performance with increasing size and num_goroutines)
func moving_avg_concurrent2(input []float64, window_size, num_goroutines int) []float64 {
    var output = make([]float64, window_size-1, len(input))
    for i := 0; i < window_size-1; i++ {
        output[i] = math.NaN()
    }
    if len(input) > 0 {
        num_items := len(input) - (window_size - 1)
        var barrier_wg sync.WaitGroup
        n := num_items / num_goroutines
        go_avg := make([][]float64, num_goroutines)
        for i := 0; i < num_goroutines; i++ {
            go_avg[i] = make([]float64, 0, num_goroutines)
        }

        for i := 0; i < num_goroutines; i++ {
            barrier_wg.Add(1)
            go func(go_id int) {
                defer barrier_wg.Done()

                // computing boundaries
                var start, stop int
                start = go_id*int(n) + (window_size - 1) // starting index
                // ending index
                if go_id != (num_goroutines - 1) {
                    stop = start + n // Ending index
                } else {
                    stop = num_items + (window_size - 1) // Ending index
                }

                loc_avg := moving_avg_serial4(input[start-(window_size-1):stop], window_size)

                loc_avg = make([]float64, stop-start)
                current_sum := 0.0
                for i := start - (window_size - 1); i < start+1; i++ {
                    current_sum += input[i]
                }
                loc_avg[0] = current_sum / float64(window_size)
                idx := 1

                for i := start + 1; i < stop; i++ {
                    loc_avg[idx] = loc_avg[idx-1] + (input[i]-input[i-(window_size)])/float64(window_size)
                    idx++
                }

                go_avg[go_id] = append(go_avg[go_id], loc_avg...)

            }(i)
        }
        barrier_wg.Wait()

        for i := 0; i < num_goroutines; i++ {
            output = append(output, go_avg[i]...)
        }

    } else { // empty input
        fmt.Println("moving_avg is panicking!")
        panic(fmt.Sprintf("%v", input))
    }
    return output
}

// returns a slice containing the moving average of the input
// change of paradigm, we opt for a master worker pattern and spawn all windows which each will be computed by a goroutine
func compute_window_avg(input, output []float64, start, end int) {
    sum := 0.0
    size := end - start
    for _, val := range input[start:end] {
        sum += val
    }
    output[end-1] = sum / float64(size)
}

func moving_avg_concurrent3(input []float64, window_size, num_goroutines int) []float64 {
    var output = make([]float64, window_size-1, len(input))
    for i := 0; i < window_size-1; i++ {
        output[i] = math.NaN()
    }
    if len(input) > 0 {
        num_windows := len(input) - (window_size - 1)
        var output = make([]float64, len(input))
        for i := 0; i < window_size-1; i++ {
            output[i] = math.NaN()
        }

        pending := make(chan *Work)
        done := make(chan *Work)

        // creating work
        go func() {
            for i := 0; i < num_windows; i++ {
                pending <- NewWork(compute_window_avg, input, output, i, i+window_size)
            }
        }()

        // start goroutines which work through pending till there is nothing left
        for i := 0; i < num_goroutines; i++ {
            go func() {
                Worker(pending, done)
            }()
        }

        // wait till every work is done
        for i := 0; i < num_windows; i++ {
            <-done
        }

        return output

    } else { // empty input
        fmt.Println("moving_avg is panicking!")
        panic(fmt.Sprintf("%v", input))
    }
    return output
}
Run Code Online (Sandbox Code Playgroud)

基准:

//############### BENCHMARKS ###############
var import_data_res11 []float64
func benchmarkMoving_avg_serial(b *testing.B, window int) {
    var r []float64
    for n := 0; n < b.N; n++ {
        r = moving_avg_serial(BackTest_res.F["Trading DrawDowns"], window)
    }
    import_data_res11 = r
}

var import_data_res14 []float64
func benchmarkMoving_avg_serial4(b *testing.B, window int) {
    var r []float64
    for n := 0; n < b.N; n++ {
        r = moving_avg_serial4(BackTest_res.F["Trading DrawDowns"], window)
    }
    import_data_res14 = r
}

var import_data_res16 []float64
func benchmarkMoving_avg_concurrent2(b *testing.B, window, num_goroutines int) {
    var r []float64
    for n := 0; n < b.N; n++ {
        r = moving_avg_concurrent2(BackTest_res.F["Trading DrawDowns"], window, num_goroutines)
    }
    import_data_res16 = r
}

var import_data_res17 []float64
func benchmarkMoving_avg_concurrent3(b *testing.B, window, num_goroutines int) {
    var r []float64
    for n := 0; n < b.N; n++ {
        r = moving_avg_concurrent3(BackTest_res.F["Trading DrawDowns"], window, num_goroutines)
    }
    import_data_res17 = r
}



func BenchmarkMoving_avg_serial_261x10(b *testing.B) {
    benchmarkMoving_avg_serial(b, 261*10)
}

func BenchmarkMoving_avg_serial4_261x10(b *testing.B) {
    benchmarkMoving_avg_serial4(b, 261*10)
}


func BenchmarkMoving_avg_concurrent2_261x10_1(b *testing.B) {
    benchmarkMoving_avg_concurrent2(b, 261*10, 1)
}
func BenchmarkMoving_avg_concurrent2_261x10_8(b *testing.B) {
    benchmarkMoving_avg_concurrent2(b, 261*10, 8)
}


func BenchmarkMoving_avg_concurrent3_261x10_1(b *testing.B) {
    benchmarkMoving_avg_concurrent3(b, 261*10, 1)
}
func BenchmarkMoving_avg_concurrent3_261x10_8(b *testing.B) {
    benchmarkMoving_avg_concurrent3(b, 261*10, 8)
}
//############### BENCHMARKS end ###############
Run Code Online (Sandbox Code Playgroud)

备注:
这是我的第一篇文章,我还在学习,所以任何建设性的批评也是受欢迎的.

use*_*197 5

事实#0:过早的优化工作通常会产生负收益,这
表明它们只是浪费时间和精力


为什么?
单个“错误的” SLOC可能会使性能降低超过+ 37%,
或者可能会提高性能以花费少于基线处理时间的-57%

51.151µs on MA(200) [10000]float64    ~ 22.017µs on MA(200) [10000]int
70.325µs on MA(200) [10000]float64
Run Code Online (Sandbox Code Playgroud)

为什么是[]int-s?
您可以在上面看到它自己-这是HPC / fintech高效子[us]处理策略的基础(并且我们仍然仅就[SERIAL]流程调度而言)。

这一次可以测试任何规模的-而是测试第一(在这里)你自己的实现,在非常相同的比例- MA(200) [10000]float64设置 -并发表您的基线持续时间[us]查看初始工艺性能和比较苹果对苹果,有51.2 [us]要比较的发布阈值。

接下来是更困难的部分:


事实1:此任务并非令人尴尬地并行进行

是的,可以进行移动平均线计算,以便确实使用某种故意灌输的“公正” [CONCURRENT]处理方法(无论是否由于某种错误,某些权威的“建议”,专业盲目性或仅因双重苏格拉底式的无知)(这显然并不意味着在移动平均数学公式中存在的卷积流处理的本质已被忘记[SERIAL]只是一个纯粹的过程,只是由于试图执行)它是在某种程度的“正当” [CONCURRENT]处理中计算出来的。

(顺便说一句。硬计算机科学家和双重领域的书呆子也会在这里反对,Go语言是根据Rob Pike的最佳技能设计而成的,具有并发协程框架,而不是任何真正的[PARALLEL]流程调度,即使Hoare的CSP也是如此-语言工具可在语言概念中使用,可能会添加一些盐和胡椒,并引入进程间通信工具的停止块类型,这将阻止“ just”- [CONCURRENT]代码段进入某些硬连线的CSP-p2p同步。)


事实2:仅在结束时才分发(以任何形式的加速)

性能不佳[SERIAL]不会设置任何标准。在单线程中进行合理数量的性能调整后,只有这样才能受益于分布式(仍然需要支付额外的串行成本,这使Amdahl Law(而不是Overhead-strict -Amdahl Law)进入了游戏)。

如果可以引入如此低水平的附加设置开销,并且仍然实现任何非凡的并行性,并将其扩展到处理不重要[SEQ]部分,那么只有这样才有机会提高处理的有效性能。

不难获得收益,因此,始终将纯粹的成本与理论上的,天真的无用的加速[SEQ]之间的潜在权衡进行基准比较non-[SEQ] / N[PAR]_processes,为此,人们将付出所有附加成本之和的费用[SEQ]-间接费用,因此当且仅当:

(         pure-[SEQ]_processing      [ns]
+       add-on-[SEQ]-setup-overheads [ns]
+        ( non-[SEQ]_processing      [ns] / N[PAR]_processes )
  ) << (  pure-[SEQ]_processing      [ns]
       + ( non-[SEQ]_processing      [ns] / 1 )
         )
Run Code Online (Sandbox Code Playgroud)

如果没有这种喷气式战斗机在身后拥有多余的高度和太阳的优势,就不要尝试进行任何类型的HPC /并行化尝试-他们永远也不会为自己做的<<比智能过程好得多而付出的代价[SEQ]


在此处输入图片说明

结语:在开销严格的阿姆达尔定律互动实验界面上

一部动画价值一百万个字。

一种交互式动画,甚至更好:

因此,
假设一个被测过程同时[SERIAL]具有[PARALLEL]一部分和一部分进度表。

p[PARALLEL]过程持续时间的一小部分~ ( 0.0 .. 1.0 )这样的[SERIAL]部分持续时间不长于( 1 - p )吧?

因此,让我们从这样的测试用例开始进行交互式实验,其中p == 1.0,表示所有这样的过程持续时间都只用了[PARALLEL]一部分,而过程流的初始序列和终止部分(基本上总是[SERIAL])都为零-持续时间( ( 1 - p ) == 0. )

假设系统没有特别的魔力,因此需要花一些实际的步骤来初始化每个[PARALLEL]零件,以便在不同的处理器上运行它( (1), 2, .., N ),因此,如果要求重新组织流程并添加一些开销,则让我们增加一些开销编组+分发+解组所有必要的指令和数据,因此现在可以在N处理器上并行启动和运行预期的进程。

称这些成本o(在这里,为简单起见,最初假定它们是恒定的,并且N对于硅片/ NUMA /分布式基础架构而言,并非总是如此)。

通过单击上方的Epilogue标题,可以打开一个交互式环境,并且可以免费进行自己的实验。

随着p == 1. && o == 0. && N > 1性能急剧上升到目前可实现的[PARALLEL]硬件O / S极限,仍然可以实现单语法例的O / S代码执行(其中,对于工作单元的MPI和类似的depeche模式分配,仍然没有额外的分配成本(如果这样,必须添加大量的[ms],而到目前为止,我们迄今为止最好的[SERIAL]实现显然已经完成了整个工作,而不仅仅是〜22.1 [us])。

但是,除了这种人为乐观的情况以外,这项工作看起来并不便宜,无法有效地并行化。

  • 尝试将其设置为零,而不是的大约0.01%的设置开销成本o,并且该行开始显示即使在最极端的[PARALLEL]情况下(具有p == 1.0),开销感知扩展的性质也有很大不同,并且具有潜在的加速效果大约是最初超理想线性加速情况的一半。

  • 现在,将其p转向更接近现实的事物,在某种程度上比和宾果游戏最初的超理想情况人为设定的地方更少,这就是现实,应该对过程调度进行测试和预先验证。== 1.00 --> { 0.99, 0.98, 0.95 }

这意味着什么?

例如,如果开销(启动+最终加入协程库)所花费的时间超过0.1%实际[PARALLEL]处理部分的持续时间的〜,则将不会有4倍的加速(大约是原始持续时间的1/4)。 )用于5个协程(具有p〜0.95),不超过10倍(持续时间快10倍),适用于20个协程(所有假设系统具有5个CPU内核,分别具有20个CPU内核可用和可用,并且准备好(最好使用O / S级CPU核亲和性映射的进程/线程),以便在整个生命周期中不间断地为所有这些协程服务,从而实现高于预期的加速。

没有足够的可用硬件资源来准备所有这些任务单元(这些任务单元旨在实现[PARALLEL]流程计划的-部分),阻塞/等待状态将引入其他绝对等待状态,并且由此产生的性能会增加这些新的- [SERIAL]阻塞/等待整个过程持续时间的部分以及最初希望加速的部分突然不复存在,并且性能因数远远低于<< 1.00(这意味着有效运行时间是由于阻塞状态比非并行的公正[SERIAL]工作流程)。

对于刚接触新手的实验者来说,这听起来可能很复杂,但是我们可以换个角度来看。在分配的整个过程中,[PARALLEL]已知的任务池已知的总长度不应短于大约a 10 [us],开销限制图显示,1000 x 10 [us][PARALLEL]部分中至少需要大约无阻塞的计算密集型处理以免破坏并行处理的效率。

如果没有足够的“胖”处理,则间接费用(显着高于上述引用的阈值~ 0.1%)会残酷地破坏成功并行处理的净效率(但以相对高的相对成本执行)设置成本与任何N处理器的有限净效果之间的对比,如可用实时图表所示)。

对于分布式计算书呆子来说,毫不奇怪,开销o还伴随着其他依赖关系- N(编组数据BLOB的大小取决于(越多的进程,要花费更多的精力来分发工作包), BLOB,在为下一个目标/ 2..N第C个接收过程中的每一个接收到的跨CSP信号,通道介导的下一个进程接收跨此类设备/资源的分布式BLOB之前,MEM- / IO设备保持阻塞的时间越长。进程间的协调(称其为额外的按事件阻止,将其p进一步降低到最终的理想水平1. )。

因此,现实世界与最初理想化,美好而有希望的世界相距甚远,并且p == 1.0( 1 - p ) == 0.0o == 0.0

从一开始就显而易见的是,如果超过了阈值,则尝试超越阈值,同时越来越差,如果使用已经表现不佳的方法来实现实际开销和扩展并没有帮助一点的话。22.1 [us] [SERIAL][PARALLEL]