sbi*_*sbi 14 c++ multithreading vxworks
我们正在使用位于VxWorks 5.5顶层的专有嵌入式平台进行编程.在我们的工具箱中,我们有一个条件变量,它是使用VxWorks二进制信号量实现的.
现在,POSIX提供了一个等待函数,它也需要一个互斥量.这将解锁互斥锁(以便某些其他任务可能写入数据)并等待另一个任务发出信号(完成写入数据).我相信这实现了所谓的监视器,ICBWT.
我们需要这样的等待函数,但实现它很棘手.一个简单的方法就是这样做:
bool condition::wait_for(mutex& mutex) const {
unlocker ul(mutex); // relinquish mutex
return wait(event);
} // ul's dtor grabs mutex again
Run Code Online (Sandbox Code Playgroud)
然而,这是一种竞争条件,因为它允许另一项任务在解锁之后和等待之前抢占这一任务.另一个任务可以写入解锁后的日期,并在此任务开始等待信号量之前发出信号.(我们已经对此进行了测试,确实发生了这种情况并永远阻止了等待任务.)
鉴于VxWorks 5.5似乎没有提供API来等待信号暂时放弃信号量,有没有办法在提供的同步例程之上实现它?
注意: 这是一个非常老的VxWorks版本,在 没有POSIX支持的情况下编译 (由专有硬件的供应商提供,根据我的理解).
对于本机vxworks,这应该非常简单,这里需要一个消息队列.您的wait_for方法可以按原样使用.
bool condition::wait_for(mutex& mutex) const
{
unlocker ul(mutex); // relinquish mutex
return wait(event);
} // ul's dtor grabs mutex again
Run Code Online (Sandbox Code Playgroud)
但wait(事件)代码看起来像这样:
wait(event)
{
if (msgQRecv(event->q, sigMsgBuf, sigMsgSize, timeoutTime) == OK)
{
// got it...
}
else
{
// timeout, report error or something like that....
}
}
Run Code Online (Sandbox Code Playgroud)
你的信号代码会像这样:
signal(event)
{
msgQSend(event->q, sigMsg, sigMsgSize, NO_WAIT, MSG_PRI_NORMAL);
}
Run Code Online (Sandbox Code Playgroud)
因此,如果在您开始等待之前触发了信号,那么msgQRecv会在最终被调用时立即返回信号,然后您可以再次在ul dtor中使用互斥锁,如上所述.
event-> q是MSG_Q_ID,它是在事件创建时创建的,调用了msgQCreate,而sigMsg中的数据是由你定义的......但是可以只是一个随机的数据字节,或者你可以想出一个更智能的结构,包含有关谁发信号的信息或其他可能不太知道的信息.
多个服务员的更新,这有点棘手:所以我会做一些假设来简化事情
这种方法使用计数信号量,类似于上面的只有一点额外的逻辑:
wait(event)
{
if (semTake(event->csm, timeoutTime) == OK)
{
// got it...
}
else
{
// timeout, report error or something like that....
}
}
Run Code Online (Sandbox Code Playgroud)
你的信号代码会像这样:
signal(event)
{
for (int x = 0; x < event->numberOfWaiters; x++)
{
semGive(event->csm);
}
}
Run Code Online (Sandbox Code Playgroud)
事件的创建是这样的,记住在这个例子中,服务员的数量是恒定的并且在事件创建时是已知的.你可以让它变得动态,但关键是每次事件发生时,numberOfWaiters必须在解锁器解锁互斥锁之前是正确的.
createEvent(numberOfWaiters)
{
event->numberOfWaiters = numberOfWaiters;
event->csv = semCCreate(SEM_Q_FIFO, 0);
return event;
}
Run Code Online (Sandbox Code Playgroud)
你不能对numberOfWaiters说道:DI会再说一遍:在解锁器解锁互斥锁之前,numberOfWaiters必须正确.要使其动态化(如果这是一项要求),您可以添加setNumWaiters(numOfWaiters)函数,并在解锁器解锁互斥锁之前在wait_for函数中调用该函数,只要它始终正确设置数字即可.
现在对于最后一个技巧,如上所述,假设一个任务负责解锁互斥锁,其余任务只是等待信号,这意味着一个且只有一个任务将调用上面的wait_for()函数,其余的任务只是调用wait(事件)函数.
考虑到这一点,numberOfWaiters的计算方法如下:
当然,如果你真的需要,你也可以使这个更复杂,但是这可能会有效,因为通常1个任务会触发一个事件,但很多任务都想知道它是完整的,这就是它所提供的.
但您的基本流程如下:
init()
{
event->createEvent(3);
}
eventHandler()
{
locker l(mutex);
doEventProcessing();
signal(event);
}
taskA()
{
doOperationThatTriggersAnEvent();
wait_for(mutex);
eventComplete();
}
taskB()
{
doWhateverIWant();
// now I need to know if the event has occurred...
wait(event);
coolNowIKnowThatIsDone();
}
taskC()
{
taskCIsFun();
wait(event);
printf("event done!\n");
}
Run Code Online (Sandbox Code Playgroud)
当我写上面的内容时,我觉得所有的OO概念都已经死了,但希望你能得到这个想法,实际上wait和wait_for应该采用相同的参数,或者没有参数,而是同一个类的成员,它们也拥有所有数据需要知道...但是,它是如何工作的概述.
小智 3
如果每个等待任务等待单独的二进制信号量,则可以避免竞争条件。这些信号量必须注册在一个容器中,信号任务使用该容器来解除对所有等待任务的阻塞。容器必须由互斥体保护。
该wait_for()方法获取一个二进制信号量,等待它,最后删除它。
void condition::wait_for(mutex& mutex) {
SEM_ID sem = semBCreate(SEM_Q_PRIORITY, SEM_EMPTY);
{
lock l(listeners_mutex); // assure exclusive access to listeners container
listeners.push_back(sem);
} // l's dtor unlocks listeners_mutex again
unlocker ul(mutex); // relinquish mutex
semTake(sem, WAIT_FOREVER);
{
lock l(listeners_mutex);
// remove sem from listeners
// ...
semDelete(sem);
}
} // ul's dtor grabs mutex again
Run Code Online (Sandbox Code Playgroud)
该signal()方法迭代所有已注册的信号量并解锁它们。
void condition::signal() {
lock l(listeners_mutex);
for_each (listeners.begin(), listeners.end(), /* call semGive()... */ )
}
Run Code Online (Sandbox Code Playgroud)
这种方法确保wait_for()永远不会错过任何一个信号。缺点是需要额外的系统资源。为了避免为每次调用创建和销毁信号量wait_for(),可以使用池。
| 归档时间: |
|
| 查看次数: |
1883 次 |
| 最近记录: |