ReleaseSemaphore 不释放信号量

Gab*_*iel 2 c++ windows multithreading synchronization semaphore

(简而言之:main() 的 WaitForSingleObject 挂在下面的程序中)。

我正在尝试编写一段代码来调度线程并在它恢复之前等待它们完成。我不是每次都创建线程,这是昂贵的,而是让它们进入睡眠状态。主线程在 CREATE_SUSPENDED 状态下创建 X 个线程。

同步是使用 X 作为 MaximumCount 的信号量完成的。信号量的计数器归零,线程被分派。线程执行一些愚蠢的循环并在它们进入睡眠之前调用 ReleaseSemaphore。然后主线程使用 WaitForSingleObject X 次以确保每个线程完成其工作并处于睡眠状态。然后它循环并再次执行所有操作。

有时程序不退出。当我启动程序时,我可以看到 WaitForSingleObject 挂起。这意味着线程的 ReleaseSemaphore 不起作用。没有什么是 printf'ed 所以据说没有出错。

也许两个线程不应该在完全相同的时间调用 ReleaseSemaphore ,但这会使信号量的目的无效......

我只是不理解它...

感谢同步线程的其他解决方案!

#define TRY  100
#define LOOP 100

HANDLE *ids;
HANDLE semaphore;

DWORD WINAPI Count(__in LPVOID lpParameter)
{ 
 float x = 1.0f;   
 while(1)
 { 
  for (int i=1 ; i<LOOP ; i++)
   x = sqrt((float)i*x);
  while (ReleaseSemaphore(semaphore,1,NULL) == FALSE)
   printf(" ReleaseSemaphore error : %d ", GetLastError());
  SuspendThread(ids[(int) lpParameter]);
 }
 return (DWORD)(int)x;
}

int main()
{
 SYSTEM_INFO sysinfo;
 GetSystemInfo( &sysinfo );
 int numCPU = sysinfo.dwNumberOfProcessors;

 semaphore = CreateSemaphore(NULL, numCPU, numCPU, NULL);
 ids = new HANDLE[numCPU];

 for (int j=0 ; j<numCPU ; j++)
  ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, CREATE_SUSPENDED, NULL);

 for (int j=0 ; j<TRY ; j++)
 {
  for (int i=0 ; i<numCPU ; i++)
  {
   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)
    printf("Timed out !!!\n");
   ResumeThread(ids[i]);  
  }
  for (int i=0 ; i<numCPU ; i++)
   WaitForSingleObject(semaphore,INFINITE);
  ReleaseSemaphore(semaphore,numCPU,NULL);
 }
 CloseHandle(semaphore);
 printf("Done\n");
 getc(stdin);
}
Run Code Online (Sandbox Code Playgroud)

Jer*_*fin 5

我一直使用线程安全队列,而不是使用信号量(至少直接使用)或让 main 显式唤醒线程来完成一些工作。当 main 想要一个工作线程做某事时,它会将要完成的工作的描述推送到队列中。每个工作线程只做一个工作,然后尝试从队列中弹出另一个工作,并最终挂起,直到队列中有一个工作让他们做:

队列的代码如下所示:

#ifndef QUEUE_H_INCLUDED
#define QUEUE_H_INCLUDED

#include <windows.h>

template<class T, unsigned max = 256>
class queue { 
    HANDLE space_avail; // at least one slot empty
    HANDLE data_avail;  // at least one slot full
    CRITICAL_SECTION mutex; // protect buffer, in_pos, out_pos

    T buffer[max];
    long in_pos, out_pos;
public:
    queue() : in_pos(0), out_pos(0) { 
        space_avail = CreateSemaphore(NULL, max, max, NULL);
        data_avail = CreateSemaphore(NULL, 0, max, NULL);
        InitializeCriticalSection(&mutex);
    }

    void push(T data) { 
        WaitForSingleObject(space_avail, INFINITE);       
        EnterCriticalSection(&mutex);
        buffer[in_pos] = data;
        in_pos = (in_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(data_avail, 1, NULL);
    }

    T pop() { 
        WaitForSingleObject(data_avail,INFINITE);
        EnterCriticalSection(&mutex);
        T retval = buffer[out_pos];
        out_pos = (out_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(space_avail, 1, NULL);
        return retval;
    }

    ~queue() { 
        DeleteCriticalSection(&mutex);
        CloseHandle(data_avail);
        CloseHandle(space_avail);
    }
};

#endif
Run Code Online (Sandbox Code Playgroud)

在线程中使用它的粗略等效代码看起来像这样。我没有确切地弄清楚你的线程函数在做什么,但它是求和平方根的东西,显然你对线程同步比目前线程实际做什么更感兴趣。

编辑:(基于评论):如果您需要main()等待某些任务完成,做更多工作,然后分配更多任务,通常最好通过将事件(例如)放入每个任务中来处理,并让您的线程函数设置事件。修改后的代码如下所示(注意队列代码不受影响):

#include "queue.hpp"

#include <iostream>
#include <process.h>
#include <math.h>
#include <vector>

struct task { 
    int val;
    HANDLE e;

    task() : e(CreateEvent(NULL, 0, 0, NULL)) { }
    task(int i) : val(i), e(CreateEvent(NULL, 0, 0, NULL)) {}
};

void process(void *p) { 
    queue<task> &q = *static_cast<queue<task> *>(p);

    task t;
    while ( -1 != (t=q.pop()).val) {
        std::cout << t.val << "\n";
        SetEvent(t.e);
    }
}

int main() { 
    queue<task> jobs;

    enum { thread_count = 4 };
    enum { task_count = 10 };

    std::vector<HANDLE> threads;
    std::vector<HANDLE> events;

    std::cout << "Creating thread pool" << std::endl;
    for (int t=0; t<thread_count; ++t)
        threads.push_back((HANDLE)_beginthread(process, 0, &jobs));
    std::cout << "Thread pool Waiting" << std::endl;

    std::cout << "First round of tasks" << std::endl;

    for (int i=0; i<task_count; ++i) {
        task t(i+1);
        events.push_back(t.e);
        jobs.push(t);
    }

    WaitForMultipleObjects(events.size(), &events[0], TRUE, INFINITE);

    events.clear();

    std::cout << "Second round of tasks" << std::endl;

    for (int i=0; i<task_count; ++i) {
        task t(i+20);
        events.push_back(t.e);
        jobs.push(t);
    }

    WaitForMultipleObjects(events.size(), &events[0], true, INFINITE);
    events.clear();

    for (int j=0; j<thread_count; ++j)
        jobs.push(-1);

    WaitForMultipleObjects(threads.size(), &threads[0], TRUE, INFINITE);

    return 0;
}
Run Code Online (Sandbox Code Playgroud)