nbo*_*eel 6 c++ windows multithreading threadpool
[编辑:感谢MSalters的回答和Raymond Chen对InterlockedIncrement与EnterCriticalSection/counter ++/LeaveCriticalSection的回答,问题解决了,下面的代码工作正常.这应该提供一个有趣的简单示例,在Windows中使用线程池]
我无法找到以下任务的简单示例.例如,我的程序需要将一个巨大的std :: vector中的值递增一,所以我想并行执行.它需要在程序的整个生命周期中多次执行此操作.我知道如何在每次调用例程时使用CreateThread,但我没有设法使用ThreadPool去掉CreateThread.
这是我做的:
class Thread {
public:
Thread(){}
virtual void run() = 0 ; // I can inherit an "IncrementVectorThread"
};
class IncrementVectorThread: public Thread {
public:
IncrementVectorThread(int threadID, int nbThreads, std::vector<int> &vec) : id(threadID), nb(nbThreads), myvec(vec) { };
virtual void run() {
for (int i=(myvec.size()*id)/nb; i<(myvec.size()*(id+1))/nb; i++)
myvec[i]++; //and let's assume myvec is properly sized
}
int id, nb;
std::vector<int> &myvec;
};
class ThreadGroup : public std::vector<Thread*> {
public:
ThreadGroup() {
pool = CreateThreadpool(NULL);
InitializeThreadpoolEnvironment(&cbe);
cleanupGroup = CreateThreadpoolCleanupGroup();
SetThreadpoolCallbackPool(&cbe, pool);
SetThreadpoolCallbackCleanupGroup(&cbe, cleanupGroup, NULL);
threadCount = 0;
}
~ThreadGroup() {
CloseThreadpool(pool);
}
PTP_POOL pool;
TP_CALLBACK_ENVIRON cbe;
PTP_CLEANUP_GROUP cleanupGroup;
volatile long threadCount;
} ;
static VOID CALLBACK runFunc(
PTP_CALLBACK_INSTANCE Instance,
PVOID Context,
PTP_WORK Work) {
ThreadGroup &thread = *((ThreadGroup*) Context);
long id = InterlockedIncrement(&(thread.threadCount));
DWORD tid = (id-1)%thread.size();
thread[tid]->run();
}
void run_threads(ThreadGroup* thread_group) {
SetThreadpoolThreadMaximum(thread_group->pool, thread_group->size());
SetThreadpoolThreadMinimum(thread_group->pool, thread_group->size());
TP_WORK *worker = CreateThreadpoolWork(runFunc, (void*) thread_group, &thread_group->cbe);
thread_group->threadCount = 0;
for (int i=0; i<thread_group->size(); i++) {
SubmitThreadpoolWork(worker);
}
WaitForThreadpoolWorkCallbacks(worker,FALSE);
CloseThreadpoolWork(worker);
}
void main() {
ThreadGroup group;
std::vector<int> vec(10000, 0);
for (int i=0; i<10; i++)
group.push_back(new IncrementVectorThread(i, 10, vec));
run_threads(&group);
run_threads(&group);
run_threads(&group);
// now, vec should be == std::vector<int>(10000, 3);
}
Run Code Online (Sandbox Code Playgroud)
所以,如果我理解得很好:
- 命令CreateThreadpool创建了一堆线程(因此,对CreateThreadpoolWork的调用很便宜,因为它不调用CreateThread)
- 我可以拥有尽可能多的线程池(如果我想要的话)一个用于"IncrementVector"的线程池,一个用于我的"DecrementVector"线程,我可以).
- 如果我需要将"增量向量"任务划分为10个线程,而不是调用10次CreateThread,我创建一个"worker",并使用相同的参数将其提交给ThreadPool 10次(因此,我需要该线程回调中的ID,知道我的std :: vector的哪一部分要递增).在这里我找不到线程ID,因为函数GetCurrentThreadId()返回线程的真实ID(即,类似于1528,而不是0..nb_launched_threads之间的东西).
最后,我不确定我是否理解这个概念:如果我将std :: vector分成10个线程,我真的需要一个单独的工作者而不是10个吗?
谢谢!
你大致直到最后一点.
关于线程池的整个想法是你不关心它有多少线程.您只需将大量工作投入到线程池中,然后让操作系统确定如何执行每个块.因此,如果您创建并提交10个块,则操作系统可能会使用池中的1到10个线程.
你不应该关心那些线程身份.不要打扰线程ID,最小或最大线程数或类似的东西.
如果您不关心线程标识,那么如何管理要更改的向量的哪个部分?简单.在创建线程池之前,将计数器初始化为零.在回调函数中,调用InterlockedIncrement
检索并递增计数器.对于每个提交的工作项,您将获得一个连续的整数.