是否多次使用std :: async对小任务表现友好?

Tho*_*ell 21 c++ concurrency performance asynchronous c++11

为了给出一些背景信息,我正在处理一个保存的文件,在使用正则表达式将文件拆分成它的组件对象之后,我需要根据它的对象类型来处理对象的数据.

我目前的想法是使用并行性来获得一点性能提升,因为加载每个对象是相互独立的.所以我要定义一个LoadObject函数接受一个std::string我将要处理的每种类型的对象,然后调用std::async如下:

void LoadFromFile( const std::string& szFileName )
{
     static const std::regex regexObject( "=== ([^=]+) ===\\n((?:.|\\n)*)\\n=== END \\1 ===", std::regex_constants::ECMAScript | std::regex_constants::optimize );

     std::ifstream inFile( szFileName );
     inFile.exceptions( std::ifstream::failbit | std::ifstream::badbit );

     std::string szFileData( (std::istreambuf_iterator<char>(inFile)), (std::istreambuf_iterator<char>()) );

     inFile.close();

     std::vector<std::future<void>> vecFutures;

     for( std::sregex_iterator itObject( szFileData.cbegin(), szFileData.cend(), regexObject ), end; itObject != end; ++itObject )
     {
          // Determine what type of object we're loading:
          if( (*itObject)[1] == "Type1" )
          {
               vecFutures.emplace_back( std::async( LoadType1, (*itObject)[2].str() ) );
          }
          else if( (*itObject)[1] == "Type2" )
          {
               vecFutures.emplace_back( std::async( LoadType2, (*itObject)[2].str() ) );
          }
          else
          {
               throw std::runtime_error( "Unexpected type encountered whilst reading data file." );
          }
     }

     // Make sure all our tasks completed:
     for( auto& future : vecFutures )
     {
           future.get();
     }
}
Run Code Online (Sandbox Code Playgroud)

请注意,应用程序中将有两种以上的类型(这只是一个简短的示例),并且可能要读取文件中的数千个对象.

我知道,由于上下文切换超过最大硬件并发性,创建太多线程通常对性能不利,但如果我的内存正确地为我服务,那么C++运行时应该监视创建的线程数并std::async适当地安排(我相信在微软的情况下他们的ConcRT库负责这个?),所以上面的代码仍然可以带来性能提升?

提前致谢!

bam*_*s53 16

C++运行时应该监视创建的线程数并适当地调度std :: async

不可以.如果异步任务实际上是异步运行(而不是延迟),那么所需要的只是在新线程上运行它们.对于为每个任务创建和启动的新线程完全有效,而不考虑硬件的并行容量有限.

有一个说明:

[注意:如果此策略与其他策略一起指定,例如使用launch :: async |的策略值时 launch :: deferred,当不再能够有效利用并发时,实现应该推迟调用或选择策略. - 尾注]

但是,这是非规范性的,并且无论如何它表明一旦不再可以利用并发性,任务可能会被延迟,因此当有人等待结果时执行,而不是仍然是异步并且在其中一个之后立即运行完成先前的异步任务,这对于最大并行性是期望的.

也就是说,如果我们有10个长时间运行的任务,并且实现只能并行执行4个,那么前4个将是异步的,然后最后6个可能会被延迟.按顺序等待期货将按顺序在单个线程上执行延期任务,从而消除这些任务的并行执行.

该说明还指出,可以推迟选择策略,而不是推迟调用.也就是说,该函数可能仍然异步运行,但该决定可能会延迟,例如,直到其中一个早期任务完成,从而为新任务释放核心.但同样,这不是必需的,该注释是非规范性的,据我所知,微软的实现是唯一一种行为方式.当我查看另一个实现libc ++时,它完全忽略了这个注释,因此使用其中一个std::launch::async或多个std::launch::any策略会导致在新线程上执行异步执行.

(我相信微软的案例,他们的ConcRT库负责这个?)

Microsoft的实现确实与您描述的一样,但这不是必需的,并且可移植程序不能依赖于该行为.

可移植地限制实际运行的线程数的一种方法是使用类似信号量的东西:

#include <future>
#include <mutex>
#include <cstdio>

// a semaphore class
//
// All threads can wait on this object. When a waiting thread
// is woken up, it does its work and then notifies another waiting thread.
// In this way only n threads will be be doing work at any time.
// 
class Semaphore {
private:
    std::mutex m;
    std::condition_variable cv;
    unsigned int count;

public:
    Semaphore(int n) : count(n) {}
    void notify() {
        std::unique_lock<std::mutex> l(m);
        ++count;
        cv.notify_one();
    }
    void wait() {
        std::unique_lock<std::mutex> l(m);
        cv.wait(l, [this]{ return count!=0; });
        --count;
    }
};

// an RAII class to handle waiting and notifying the next thread
// Work is done between when the object is created and destroyed
class Semaphore_waiter_notifier {
    Semaphore &s;
public:
    Semaphore_waiter_notifier(Semaphore &s) : s{s} { s.wait(); }
    ~Semaphore_waiter_notifier() { s.notify(); }
};

// some inefficient work for our threads to do
int fib(int n) {
    if (n<2) return n;
    return fib(n-1) + fib(n-2);
}

// for_each algorithm for iterating over a container but also
// making an integer index available.
//
// f is called like f(index, element)
template<typename Container, typename F>
F for_each(Container &c, F f) {
    Container::size_type i = 0;
    for (auto &e : c)
        f(i++, e);
    return f;
}

// global semaphore so that lambdas don't have to capture it
Semaphore thread_limiter(4);

int main() {
    std::vector<int> input(100);
    for_each(input, [](int i, int &e) { e = (i%10) + 35; });

    std::vector<std::future<int>> output;
    for_each(input, [&output](int i, int e) {
        output.push_back(std::async(std::launch::async, [] (int task, int n) -> int {
            Semaphore_waiter_notifier w(thread_limiter);
            std::printf("Starting task %d\n", task);
            int res = fib(n);
            std::printf("\t\t\t\t\t\tTask %d finished\n", task);
            return res;
        }, i, e));
    });

    for_each(output, [](int i, std::future<int> &e) {
        std::printf("\t\t\tWaiting on task %d\n", i);
        int res = e.get();
        std::printf("\t\t\t\t\t\t\t\t\tTask %d result: %d\n", i, res);
    });
}
Run Code Online (Sandbox Code Playgroud)