并行循环中的惰性向量访问

Enz*_*ano 6 c++ parallel-processing openmp c++17

性能关键的并行代码中,我有一个向量,其元素是:

  • 计算起来非常昂贵,结果是确定性的(给定位置的元素值仅取决于位置)
  • 随机访问(通常访问次数大于或大于向量的大小)
  • 集群访问(许多访问请求相同的值)
  • 向量由不同的线程共享(竞争条件?)
  • 为避免堆碎片整理,永远不应重新创建对象,但只要可能,就可以重置和回收
  • 放置在向量中的值将由多态对象提供

目前,我预先计算了向量的所有可能值,因此竞争条件不应成为问题.为了提高性能,我正在考虑创建一个惰性向量,这样代码只在请求向量元素时执行计算.在并行区域中,可能会发生多个线程同时请求并且可能同时计算相同元素的情况.我如何处理这种可能的竞争条件?

下面是我想要实现的一个例子.它在Windows 10,Visual Studio 17下编译和运行.我使用C++ 17.

// Lazy.cpp : Defines the entry point for the console application.
#include "stdafx.h"
#include <vector>
#include <iostream>
#include <stdlib.h>  
#include <chrono>
#include <math.h>
const double START_SUM = 1;
const double END_SUM = 1000;

//base object responsible for providing the values
class Evaluator
{
public:
    Evaluator() {};
    ~Evaluator() {};
    //Function with deterministic output, depending on the position
    virtual double expensiveFunction(int pos) const = 0;
};
//
class EvaluatorA: public Evaluator
{
public:
    //expensive evaluation
    virtual double expensiveFunction(int pos) const override {
        double t = 0;
        for (int j = START_SUM; j++ < END_SUM; j++)
            t += log(exp(log(exp(log(j + pos)))));
        return t;
    }
    EvaluatorA() {};
    ~EvaluatorA() {};
};
class EvaluatorB : public Evaluator
{
public:
    //even more expensive evaluation
    virtual double expensiveFunction(int pos) const override {
        double t = 0;
        for (int j = START_SUM; j++ < 10*END_SUM; j++)
            t += log(exp(log(exp(log(j + pos)))));
        return t;
    }
    EvaluatorB() {};
    ~EvaluatorB() {};
};

class LazyVectorTest //vector that contains N possible results
{
public:
    LazyVectorTest(int N,const Evaluator & eval) : N(N), innerContainer(N, 0), isThatComputed(N, false), eval_ptr(&eval)
    {};
    ~LazyVectorTest() {};

    //reset, to generate a new table of values
    //the size of the vector stays constant
    void reset(const Evaluator & eval) {
        this->eval_ptr = &eval;
        for (int i = 0; i<N; i++)
            isThatComputed[i] = false;
    }
    int size() { return N; }
    //accessing the same position should yield the same result
    //unless the object is resetted
    const inline double& operator[](int pos) {
        if (!isThatComputed[pos]) {
            innerContainer[pos] = eval_ptr->expensiveFunction(pos);
            isThatComputed[pos] = true;
        }
        return innerContainer[pos];
    }

private:
    const int N;
    const Evaluator* eval_ptr;
    std::vector<double> innerContainer;
    std::vector<bool> isThatComputed;
};
    //the parallel access will take place here

template <typename T>
double accessingFunction(T& A, const std::vector<int>& elementsToAccess) {
    double tsum = 0;
    int size = elementsToAccess.size();
//#pragma omp parallel for
    for (int i = 0; i < size; i++)
        tsum += A[elementsToAccess[i]];
    return tsum;
}

std::vector<int> randomPos(int sizePos, int N) {
    std::vector<int> elementsToAccess;
    for (int i = 0; i < sizePos; i++)
        elementsToAccess.push_back(rand() % N);
    return elementsToAccess;
}
int main()
{
    srand(time(0));
    int minAccessNumber = 1;
    int maxAccessNumber = 100;
    int sizeVector = 50;

    auto start = std::chrono::steady_clock::now();
    double res = 0;
    float numberTest = 100;
    typedef LazyVectorTest container;

    EvaluatorA eval;
    for (int i = 0; i < static_cast<int>(numberTest); i++) {
        res = eval.expensiveFunction(i);
    }
    auto end = std::chrono::steady_clock::now();
    std::chrono::duration<double, std::milli>diff(end - start);
    double benchmark = diff.count() / numberTest;
    std::cout <<"Average time to compute expensive function:" <<benchmark<<" ms"<<std::endl;
    std::cout << "Value of the function:" << res<< std::endl;
    std::vector<std::vector<int>> indexs(numberTest);
    container A(sizeVector, eval);

    for (int accessNumber = minAccessNumber; accessNumber < maxAccessNumber; accessNumber++) {
        indexs.clear();
        for (int i = 0; i < static_cast<int>(numberTest); i++) {
            indexs.emplace_back(randomPos(accessNumber, sizeVector));
        }
        auto start_lazy = std::chrono::steady_clock::now();
        for (int i = 0; i < static_cast<int>(numberTest); i++) {
            A.reset(eval);
            double res_lazy = accessingFunction(A, indexs[i]);
        }
        auto end_lazy = std::chrono::steady_clock::now();
        std::chrono::duration<double, std::milli>diff_lazy(end_lazy - start_lazy);
        std::cout << accessNumber << "," << diff_lazy.count() / numberTest << ", " << diff_lazy.count() / (numberTest* benchmark) << std::endl;
    }
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

Zul*_*lan 1

您当前的代码有问题,主要是因为std::vector<bool> 太糟糕了,而且还缺少原子性和内存一致性。这是完全基于 OpenMP 的解决方案的草图。我建议实际上对丢失的条目进行特殊标记,而不是单独标记vector<bool>- 它使一切变得更加容易:

class LazyVectorTest //vector that contains N possible results
{
public:
    LazyVectorTest(int N,const Evaluator & eval) : N(N), innerContainer(N, invalid), eval_ptr(&eval)
    {};
    ~LazyVectorTest() {};

    //reset, to generate a new table of values
    //the size of the vector stays constant
    void reset(const Evaluator & eval) {
        this->eval_ptr = &eval;
        for (int i = 0; i<N; i++) {
            // Use atomic if that could possible be done in parallel
            // omit that for performance if you doun't ever run it in parallel
            #pragma omp atomic write
            innerContainer[i] = invalid;
        }
        // Flush to make sure invalidation is visible to all threads
        #pragma omp flush
    }
    int size() { return N; }
    // Don't return a reference here
    double operator[] (int pos) {
        double value;
        #pragma omp atomic read
        value = innerContainer[pos];
        if (value == invalid) {
            value = eval_ptr->expensiveFunction(pos);
            #pragma omp atomic write
            innerContainer[pos] = value;
        }
        return value;
    }

private:
    // Use nan, inf or some random number - doesn't really matter
    static constexpr double invalid = std::nan("");
    const int N;
    const Evaluator* eval_ptr;
    std::vector<double> innerContainer;
};
Run Code Online (Sandbox Code Playgroud)

如果发生冲突,其他线程将只是冗余地计算该值。- 利用确定性本质。我omp atomic在元素的读取和写入上使用,可以确保不会读取不一致的“半写”值。

对于罕见的不良情况,此解决方案可能会产生一些额外的延迟。反过来,好的情况是最优的,只需一次原子读取。你甚至不需要任何内存flushes / seq_cst- 最坏的情况是冗余计算。如果单独编写标志和值,则需要这些(顺序一致性),以确保更改可见的顺序正确。