高效的C++ 11设计为事件监听器定期唤醒?

Dar*_*ook 7 events multithreading latency future c++11

问题

我有一个系统(使用C++ 11)与不规则事件的生产者:( P例如,它可能是UI事件,或者它可能通过TCP/IP套接字接收金融交易数据等).每个事件都附带一个小数据包.

然后,我有一个数字"工蜂"的:B1,B2,B3,...每一种确实的事件的自己加工P饲料他们.他们的处理可能很快,但可能需要很长时间,这就是为什么计划是在自己的线程中运行每个工蜂.此外,每个工蜂需要每N秒运行一次不同的功能(例如N = 60,但它也可能因工蜂而异).此常规处理应始终与事件处理串行完成,永远不要在不同的线程上完成.

最后,一些工蜂也可能会从其他生产活动(P2,P3等),但如果复杂的事情,我总是可以有P1,P2等喂到中央P,他们的工作是将所有事件发送到工蜂.

问题

这种系统的最佳设计是什么?低延迟和高效率是最佳的主要标准.可靠性也很重要:每个B事件都必须接收每个事件(即使它们是批量生成的,因为它当时很忙),如果一个B事件崩溃,它不应该影响其他事件.

如果重要:假设1-64个工蜂,4-8个硬件线程,事件之间的平均时间为10秒,事件之间的最短时间为0.2秒,典型的常规功能是每秒N=60.但如果理想的设计对任何这些标准都敏感,我想了解如何.

注意:如果工蜂可以保证永远不会抛出异常,那是否会改变最佳设计的选择?(感觉这将是无关的,但我想我会把它提起来.)

注意:蜜蜂可能比硬件线程更多; 假设这是另一个问题.(例如,延迟可能对某些工蜂很重要,并且可能会给他们自己的线程,而其他人可能会被告知共享一个线程.)

想法一:等到事件或超时

每个P都有互斥和条件.当它获得新数据时,它会发出信号.

每个工蜂使用theCondition.wait_until(lock,timeout)这里timeout是下一个需要醒来做定期处理的时间.它检查返回值以查看它是否已发出信号或超时.

这里的缺点似乎是它只是一个信号,没有数据.所以我需要每个B人获得另一个锁,以便对数据队列进行读访问.通常他们都希望同时做到这一点,所以这变得很难看.

我也不清楚如果B需要花费很长时间处理某些事情并在它wait_until再次调用之前错过了几个事件会发生什么.

想法二:每个工人的数据队列

这里每个B都有一个带锁的队列.P获取写锁定,并添加数据项.B获得一个读锁定,以便在准备好时关闭每个项目.B因为有新数据,我仍然需要一些知道醒来的方法.

这里的缺点似乎是P线程需要循环遍历每个B以给它们数据.这会引入延迟,并且也会感到脆弱(例如,如果其中一个工蜂表现不佳).

想法三:期货

这个问题感觉非常适合未来. P创建一个std::promise,然后每个B得到一个std::future(a std::shared_future,我假设).当P收到一个新事件时,它会调用set_value()诺言.每个人都B在呼唤wait_until着自己的未来.

当信号和数据汇集在一起​​时,这很有吸引力.也没有锁定,所以它应该是有弹性的.

我坚持的一点是承诺/未来是一颗子弹枪.我需要在每个新事件之后立即创建一组新的promise/shared_future对.怎么可能工作?(shared_future作为set_value呼叫发送的数据的一部分,我可以通过下一个吗?)如果两个事件连续快速发生,是否有任何工作人员错过任何事件?

wbe*_*ett 4

听起来您可以从生产者-消费者模式中受益。下面是一个使用 boost 库和无锁队列(来自 boost)的示例,只需更改它正在操作的类型:

boost::atomic_int producer_count(0);
boost::atomic_int consumer_count(0);

boost::lockfree::queue<int> queue(128);

const int iterations = 10000000;
const int producer_thread_count = 4;
const int consumer_thread_count = 4;

void producer(void)
{
    for (int i = 0; i != iterations; ++i) {
       int value = ++producer_count;
        while (!queue.push(value))
            ;
    }
}

boost::atomic<bool> done (false);
void consumer(void)
{
    int value;
    while (!done) {
        while (queue.pop(value))
            ++consumer_count;
    }

    while (queue.pop(value))
        ++consumer_count;
}

int main(int argc, char* argv[])
{
    using namespace std;
    cout << "boost::lockfree::queue is ";
    if (!queue.is_lock_free())
        cout << "not ";
    cout << "lockfree" << endl;

    boost::thread_group producer_threads, consumer_threads;

    for (int i = 0; i != producer_thread_count; ++i)
        producer_threads.create_thread(producer);

    for (int i = 0; i != consumer_thread_count; ++i)
        consumer_threads.create_thread(consumer);

    producer_threads.join_all();
    done = true;

    consumer_threads.join_all();

    cout << "produced " << producer_count << " objects." << endl;
    cout << "consumed " << consumer_count << " objects." << endl;
}
Run Code Online (Sandbox Code Playgroud)