Dar*_*ook 7 events multithreading latency future c++11
问题
我有一个系统(使用C++ 11)与不规则事件的生产者:( P例如,它可能是UI事件,或者它可能通过TCP/IP套接字接收金融交易数据等).每个事件都附带一个小数据包.
然后,我有一个数字"工蜂"的:B1,B2,B3,...每一种确实的事件的自己加工P饲料他们.他们的处理可能很快,但可能需要很长时间,这就是为什么计划是在自己的线程中运行每个工蜂.此外,每个工蜂需要每N秒运行一次不同的功能(例如N = 60,但它也可能因工蜂而异).此常规处理应始终与事件处理串行完成,永远不要在不同的线程上完成.
最后,一些工蜂也可能会从其他生产活动(P2,P3等),但如果复杂的事情,我总是可以有P1,P2等喂到中央P,他们的工作是将所有事件发送到工蜂.
问题
这种系统的最佳设计是什么?低延迟和高效率是最佳的主要标准.可靠性也很重要:每个B事件都必须接收每个事件(即使它们是批量生成的,因为它当时很忙),如果一个B事件崩溃,它不应该影响其他事件.
如果重要:假设1-64个工蜂,4-8个硬件线程,事件之间的平均时间为10秒,事件之间的最短时间为0.2秒,典型的常规功能是每秒N=60.但如果理想的设计对任何这些标准都敏感,我想了解如何.
注意:如果工蜂可以保证永远不会抛出异常,那是否会改变最佳设计的选择?(感觉这将是无关的,但我想我会把它提起来.)
注意:蜜蜂可能比硬件线程更多; 假设这是另一个问题.(例如,延迟可能对某些工蜂很重要,并且可能会给他们自己的线程,而其他人可能会被告知共享一个线程.)
想法一:等到事件或超时
每个P都有互斥和条件.当它获得新数据时,它会发出信号.
每个工蜂使用theCondition.wait_until(lock,timeout)这里timeout是下一个需要醒来做定期处理的时间.它检查返回值以查看它是否已发出信号或超时.
这里的缺点似乎是它只是一个信号,没有数据.所以我需要每个B人获得另一个锁,以便对数据队列进行读访问.通常他们都希望同时做到这一点,所以这变得很难看.
我也不清楚如果B需要花费很长时间处理某些事情并在它wait_until再次调用之前错过了几个事件会发生什么.
想法二:每个工人的数据队列
这里每个B都有一个带锁的队列.P获取写锁定,并添加数据项.B获得一个读锁定,以便在准备好时关闭每个项目.B因为有新数据,我仍然需要一些知道醒来的方法.
这里的缺点似乎是P线程需要循环遍历每个B以给它们数据.这会引入延迟,并且也会感到脆弱(例如,如果其中一个工蜂表现不佳).
想法三:期货
这个问题感觉非常适合未来. P创建一个std::promise,然后每个B得到一个std::future(a std::shared_future,我假设).当P收到一个新事件时,它会调用set_value()诺言.每个人都B在呼唤wait_until着自己的未来.
当信号和数据汇集在一起时,这很有吸引力.也没有锁定,所以它应该是有弹性的.
我坚持的一点是承诺/未来是一颗子弹枪.我需要在每个新事件之后立即创建一组新的promise/shared_future对.怎么可能工作?(shared_future作为set_value呼叫发送的数据的一部分,我可以通过下一个吗?)如果两个事件连续快速发生,是否有任何工作人员错过任何事件?
听起来您可以从生产者-消费者模式中受益。下面是一个使用 boost 库和无锁队列(来自 boost)的示例,只需更改它正在操作的类型:
boost::atomic_int producer_count(0);
boost::atomic_int consumer_count(0);
boost::lockfree::queue<int> queue(128);
const int iterations = 10000000;
const int producer_thread_count = 4;
const int consumer_thread_count = 4;
void producer(void)
{
for (int i = 0; i != iterations; ++i) {
int value = ++producer_count;
while (!queue.push(value))
;
}
}
boost::atomic<bool> done (false);
void consumer(void)
{
int value;
while (!done) {
while (queue.pop(value))
++consumer_count;
}
while (queue.pop(value))
++consumer_count;
}
int main(int argc, char* argv[])
{
using namespace std;
cout << "boost::lockfree::queue is ";
if (!queue.is_lock_free())
cout << "not ";
cout << "lockfree" << endl;
boost::thread_group producer_threads, consumer_threads;
for (int i = 0; i != producer_thread_count; ++i)
producer_threads.create_thread(producer);
for (int i = 0; i != consumer_thread_count; ++i)
consumer_threads.create_thread(consumer);
producer_threads.join_all();
done = true;
consumer_threads.join_all();
cout << "produced " << producer_count << " objects." << endl;
cout << "consumed " << consumer_count << " objects." << endl;
}
Run Code Online (Sandbox Code Playgroud)