uni*_*nob 8 c++ multithreading pthreads
我得到了需要在不同阶段处理的各种类型的传感器数据。根据我所阅读的内容,最有效的方法是将任务拆分为线程。每个将处理后的数据放入下一个线程的入口队列中。所以基本上,一个管道。
数据可能非常大(几 Mbs),因此需要将其从传感器缓冲区中复制出来,然后传递给将修改它并传递它的线程。
我有兴趣了解进行传球的最佳方式。我读到,如果我在线程之间进行消息发布,我可以分配数据并将指针传递给其他线程,以便接收线程可以处理取消分配它。我不太确定,这对于流数据如何工作,即确保线程按顺序处理消息(我想我可以添加时间检查?)。另外,我应该为这样的实现使用什么数据结构?我想我无论如何都需要使用锁?
使用同步队列会更有效吗?
让我知道其他解决方案是否更好。计算需要实时进行,所以我需要这样做才能真正高效。如果有人链接到通过线程管道传递数据的良好示例,我将非常有兴趣查看它。
警告:没有 boost 或其他库。使用线程。我需要使实现尽可能接近标准库。这最终将用于各种平台(我还不知道)。
我最近不得不做类似的事情。我使用了输入/输出队列的方法。我认为是最好和最快速的使用方法。这是我的线程安全并发队列版本。我在我的项目中有三个工作线程按顺序对同一个缓冲区等进行大量计算。每个线程使用来自输入队列的 pop 并推送输出队列。所以我有这个 wpop 等待队列中可用的下一个缓冲区。我希望对你有用。
/*
Thread_safe queue with conditional variable
*/
template<typename dataType>
class CConcurrentQueue
{
private:
/// Queue
std::queue<dataType> m_queue;
/// Mutex to controll multiple access
std::mutex m_mutex;
/// Conditional variable used to fire event
std::condition_variable m_cv;
/// Atomic variable used to terminate immediately wpop and wtpop functions
std::atomic<bool> m_forceExit = false;
public:
/// <summary> Add a new element in the queue. </summary>
/// <param name="data"> New element. </param>
void push ( dataType const& data )
{
m_forceExit.store ( false );
std::unique_lock<std::mutex> lk ( m_mutex );
m_queue.push ( data );
lk.unlock ();
m_cv.notify_one ();
}
/// <summary> Check queue empty. </summary>
/// <returns> True if the queue is empty. </returns>
bool isEmpty () const
{
std::unique_lock<std::mutex> lk ( m_mutex );
return m_queue.empty ();
}
/// <summary> Pop element from queue. </summary>
/// <param name="popped_value"> [in,out] Element. </param>
/// <returns> false if the queue is empty. </returns>
bool pop ( dataType& popped_value )
{
std::unique_lock<std::mutex> lk ( m_mutex );
if ( m_queue.empty () )
{
return false;
}
else
{
popped_value = m_queue.front ();
m_queue.pop ();
return true;
}
}
/// <summary> Wait and pop an element in the queue. </summary>
/// <param name="popped_value"> [in,out] Element. </param>
/// <returns> False for forced exit. </returns>
bool wpop ( dataType& popped_value )
{
std::unique_lock<std::mutex> lk ( m_mutex );
m_cv.wait ( lk, [&]()->bool{ return !m_queue.empty () || m_forceExit.load(); } );
if ( m_forceExit.load() ) return false;
popped_value = m_queue.front ();
m_queue.pop ();
return true;
}
/// <summary> Timed wait and pop an element in the queue. </summary>
/// <param name="popped_value"> [in,out] Element. </param>
/// <param name="milliseconds"> [in] Wait time. </param>
/// <returns> False for timeout or forced exit. </returns>
bool wtpop ( dataType& popped_value , long milliseconds = 1000)
{
std::unique_lock<std::mutex> lk ( m_mutex );
m_cv.wait_for ( lk, std::chrono::milliseconds ( milliseconds ), [&]()->bool{ return !m_queue.empty () || m_forceExit.load(); } );
if ( m_forceExit.load() ) return false;
if ( m_queue.empty () ) return false;
popped_value = m_queue.front ();
m_queue.pop ();
return true;
}
/// <summary> Queue size. </summary>
int size ()
{
std::unique_lock<std::mutex> lk ( m_mutex );
return static_cast< int >( m_queue.size () );
}
/// <summary> Free the queue and force stop. </summary>
void clear ()
{
m_forceExit.store( true );
std::unique_lock<std::mutex> lk ( m_mutex );
while ( !m_queue.empty () )
{
delete m_queue.front ();
m_queue.pop ();
}
}
/// <summary> Check queue in forced exit state. </summary>
bool isExit () const
{
return m_forceExit.load();
}
};
Run Code Online (Sandbox Code Playgroud)