fen*_*aun 56 c++ python event-loop blocking
如果你使用过gui工具包,你就知道有一个事件循环/主循环应该在一切完成后执行,这将使应用程序保持活跃并响应不同的事件.例如,对于Qt,您可以在main()中执行此操作:
int main() {
QApplication app(argc, argv);
// init code
return app.exec();
}
Run Code Online (Sandbox Code Playgroud)
在这种情况下,app.exec()是应用程序的主循环.
实现这种循环的显而易见的方法是:
void exec() {
while (1) {
process_events(); // create a thread for each new event (possibly?)
}
}
Run Code Online (Sandbox Code Playgroud)
但是这会将CPU限制在100%并且实际上是无用的.现在,我如何实现这样一个响应的事件循环,而不必完全占用CPU?
回答在Python和/或C++中受到赞赏.谢谢.
脚注:为了学习,我将实现自己的信号/插槽,我会用它们来生成自定义事件(例如go_forward_event(steps)).但是如果你知道如何手动使用系统事件,我也想知道这一点.
小智 72
我曾经多次想到这一点!
GUI主循环在伪代码中如下所示:
void App::exec() {
for(;;) {
vector<Waitable> waitables;
waitables.push_back(m_networkSocket);
waitables.push_back(m_xConnection);
waitables.push_back(m_globalTimer);
Waitable* whatHappened = System::waitOnAll(waitables);
switch(whatHappened) {
case &m_networkSocket: readAndDispatchNetworkEvent(); break;
case &m_xConnection: readAndDispatchGuiEvent(); break;
case &m_globalTimer: readAndDispatchTimerEvent(); break;
}
}
}
Run Code Online (Sandbox Code Playgroud)
什么是"等待"?嗯,这取决于系统.在UNIX上,它被称为"文件描述符","waitOnAll"是:: select系统调用.所谓的vector<Waitable>是::fd_set在UNIX上,实际上是通过查询"whatHappened" FD_ISSET.实际的waitable-handle以各种方式获取,例如m_xConnection可以从:: XConnectionNumber()获取.X11还提供了一个高层次的,可移植的API为这一点- :: XNextEvent例行() -但如果你使用,你将无法等待几个事件源同时.
阻止如何工作?"waitOnAll"是一个系统调用,告诉操作系统将您的进程置于"睡眠列表"中.这意味着在其中一个等待事件发生之前,您不会获得任何CPU时间.这意味着您的进程处于空闲状态,消耗0%的CPU.当事件发生时,您的进程将对其作出短暂反应,然后返回空闲状态.GUI应用程序几乎把所有时间花在闲置上.
你睡觉时所有CPU周期会发生什么?要看.有时另一个过程会对它们有用.如果没有,您的操作系统将忙于循环CPU,或将其置于临时低功耗模式等.
请询问更多详情!
Vas*_*sil 21
蟒蛇:
您可以查看Twisted reactor的实现,这可能是python中事件循环的最佳实现.Twisted中的反应器是接口的实现,您可以指定要运行的类型反应器:select,epoll,kqueue(所有基于使用这些系统调用的ac api),还有基于QT和GTK工具包的反应器.
一个简单的实现是使用select:
#echo server that accepts multiple client connections without forking threads
import select
import socket
import sys
host = ''
port = 50000
backlog = 5
size = 1024
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((host,port))
server.listen(backlog)
input = [server,sys.stdin]
running = 1
#the eventloop running
while running:
inputready,outputready,exceptready = select.select(input,[],[])
for s in inputready:
if s == server:
# handle the server socket
client, address = server.accept()
input.append(client)
elif s == sys.stdin:
# handle standard input
junk = sys.stdin.readline()
running = 0
else:
# handle all other sockets
data = s.recv(size)
if data:
s.send(data)
else:
s.close()
input.remove(s)
server.close()
Run Code Online (Sandbox Code Playgroud)
小智 10
我会使用一个简单,轻量级的消息库,名为ZeroMQ(http://www.zeromq.org/).它是一个开源库(LGPL).这是一个非常小的图书馆; 在我的服务器上,整个项目在大约60秒内编译.
ZeroMQ将极大地简化您的事件驱动代码,并且它在性能方面也是最有效的解决方案.使用ZeroMQ在线程之间进行通信比使用信号量或本地UNIX套接字快得多(就速度而言).ZeroMQ也是100%可移植的解决方案,而所有其他解决方案将您的代码绑定到特定的操作系统.
这是一个 C++ 事件循环。在创建对象时EventLoop,它会创建一个线程,该线程不断运行分配给它的任何任务。如果没有可用的任务,主线程将进入休眠状态,直到添加任务为止。
首先,我们需要一个线程安全队列,它允许多个生产者和至少一个消费者(线程EventLoop)。对象EventLoop控制消费者和生产者。只需稍加改动,就可以添加多个消费者(运行线程),而不是只添加一个线程。
#include <stdio.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <set>
#include <functional>
#if defined( WIN32 )
#include <windows.h>
#endif
class EventLoopNoElements : public std::runtime_error
{
public:
EventLoopNoElements(const char* error)
: std::runtime_error(error)
{
}
};
template <typename Type>
struct EventLoopCompare {
typedef std::tuple<std::chrono::time_point<std::chrono::system_clock>, Type> TimePoint;
bool operator()(const typename EventLoopCompare<Type>::TimePoint left, const typename EventLoopCompare<Type>::TimePoint right) {
return std::get<0>(left) < std::get<0>(right);
}
};
/**
* You can enqueue any thing with this event loop. Just use lambda functions, future and promises!
* With lambda `event.enqueue( 1000, [myvar, myfoo](){ myvar.something(myfoo); } )`
* With futures we can get values from the event loop:
* ```
* std::promise<int> accumulate_promise;
* event.enqueue( 2000, [&accumulate_promise](){ accumulate_promise.set_value(10); } );
* std::future<int> accumulate_future = accumulate_promise.get_future();
* accumulate_future.wait(); // It is not necessary to call wait, except for syncing the output.
* std::cout << "result=" << std::flush << accumulate_future.get() << std::endl;
* ```
* It is just not a nice ideia to add something which hang the whole event loop queue.
*/
template <class Type>
struct EventLoop {
typedef std::multiset<
typename EventLoopCompare<Type>::TimePoint,
EventLoopCompare<Type>
> EventLoopQueue;
bool _shutdown;
bool _free_shutdown;
std::mutex _mutex;
std::condition_variable _condition_variable;
EventLoopQueue _queue;
std::thread _runner;
// free_shutdown - if true, run all events on the queue before exiting
EventLoop(bool free_shutdown)
: _shutdown(false),
_free_shutdown(free_shutdown),
_runner( &EventLoop<Type>::_event_loop, this )
{
}
virtual ~EventLoop() {
std::unique_lock<std::mutex> dequeuelock(_mutex);
_shutdown = true;
_condition_variable.notify_all();
dequeuelock.unlock();
if (_runner.joinable()) {
_runner.join();
}
}
// Mutex and condition variables are not movable and there is no need for smart pointers yet
EventLoop(const EventLoop&) = delete;
EventLoop& operator =(const EventLoop&) = delete;
EventLoop(const EventLoop&&) = delete;
EventLoop& operator =(const EventLoop&&) = delete;
// To allow multiple threads to consume data, just add a mutex here and create multiple threads on the constructor
void _event_loop() {
while ( true ) {
try {
Type call = dequeue();
call();
}
catch (EventLoopNoElements&) {
return;
}
catch (std::exception& error) {
std::cerr << "Unexpected exception on EventLoop dequeue running: '" << error.what() << "'" << std::endl;
}
catch (...) {
std::cerr << "Unexpected exception on EventLoop dequeue running." << std::endl;
}
}
std::cerr << "The main EventLoop dequeue stopped running unexpectedly!" << std::endl;
}
// Add an element to the queue
void enqueue(int timeout, Type element) {
std::chrono::time_point<std::chrono::system_clock> timenow = std::chrono::system_clock::now();
std::chrono::time_point<std::chrono::system_clock> newtime = timenow + std::chrono::milliseconds(timeout);
std::unique_lock<std::mutex> dequeuelock(_mutex);
_queue.insert(std::make_tuple(newtime, element));
_condition_variable.notify_one();
}
// Blocks until getting the first-element or throw EventLoopNoElements if it is shutting down
// Throws EventLoopNoElements when it is shutting down and there are not more elements
Type dequeue() {
typename EventLoopQueue::iterator queuebegin;
typename EventLoopQueue::iterator queueend;
std::chrono::time_point<std::chrono::system_clock> sleeptime;
// _mutex prevents multiple consumers from getting the same item or from missing the wake up
std::unique_lock<std::mutex> dequeuelock(_mutex);
do {
queuebegin = _queue.begin();
queueend = _queue.end();
if ( queuebegin == queueend ) {
if ( _shutdown ) {
throw EventLoopNoElements( "There are no more elements on the queue because it already shutdown." );
}
_condition_variable.wait( dequeuelock );
}
else {
if ( _shutdown ) {
if (_free_shutdown) {
break;
}
else {
throw EventLoopNoElements( "The queue is shutting down." );
}
}
std::chrono::time_point<std::chrono::system_clock> timenow = std::chrono::system_clock::now();
sleeptime = std::get<0>( *queuebegin );
if ( sleeptime <= timenow ) {
break;
}
_condition_variable.wait_until( dequeuelock, sleeptime );
}
} while ( true );
Type firstelement = std::get<1>( *queuebegin );
_queue.erase( queuebegin );
dequeuelock.unlock();
return firstelement;
}
};
Run Code Online (Sandbox Code Playgroud)
打印当前时间戳的实用程序:
#include <stdio.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <set>
#include <functional>
#if defined( WIN32 )
#include <windows.h>
#endif
class EventLoopNoElements : public std::runtime_error
{
public:
EventLoopNoElements(const char* error)
: std::runtime_error(error)
{
}
};
template <typename Type>
struct EventLoopCompare {
typedef std::tuple<std::chrono::time_point<std::chrono::system_clock>, Type> TimePoint;
bool operator()(const typename EventLoopCompare<Type>::TimePoint left, const typename EventLoopCompare<Type>::TimePoint right) {
return std::get<0>(left) < std::get<0>(right);
}
};
/**
* You can enqueue any thing with this event loop. Just use lambda functions, future and promises!
* With lambda `event.enqueue( 1000, [myvar, myfoo](){ myvar.something(myfoo); } )`
* With futures we can get values from the event loop:
* ```
* std::promise<int> accumulate_promise;
* event.enqueue( 2000, [&accumulate_promise](){ accumulate_promise.set_value(10); } );
* std::future<int> accumulate_future = accumulate_promise.get_future();
* accumulate_future.wait(); // It is not necessary to call wait, except for syncing the output.
* std::cout << "result=" << std::flush << accumulate_future.get() << std::endl;
* ```
* It is just not a nice ideia to add something which hang the whole event loop queue.
*/
template <class Type>
struct EventLoop {
typedef std::multiset<
typename EventLoopCompare<Type>::TimePoint,
EventLoopCompare<Type>
> EventLoopQueue;
bool _shutdown;
bool _free_shutdown;
std::mutex _mutex;
std::condition_variable _condition_variable;
EventLoopQueue _queue;
std::thread _runner;
// free_shutdown - if true, run all events on the queue before exiting
EventLoop(bool free_shutdown)
: _shutdown(false),
_free_shutdown(free_shutdown),
_runner( &EventLoop<Type>::_event_loop, this )
{
}
virtual ~EventLoop() {
std::unique_lock<std::mutex> dequeuelock(_mutex);
_shutdown = true;
_condition_variable.notify_all();
dequeuelock.unlock();
if (_runner.joinable()) {
_runner.join();
}
}
// Mutex and condition variables are not movable and there is no need for smart pointers yet
EventLoop(const EventLoop&) = delete;
EventLoop& operator =(const EventLoop&) = delete;
EventLoop(const EventLoop&&) = delete;
EventLoop& operator =(const EventLoop&&) = delete;
// To allow multiple threads to consume data, just add a mutex here and create multiple threads on the constructor
void _event_loop() {
while ( true ) {
try {
Type call = dequeue();
call();
}
catch (EventLoopNoElements&) {
return;
}
catch (std::exception& error) {
std::cerr << "Unexpected exception on EventLoop dequeue running: '" << error.what() << "'" << std::endl;
}
catch (...) {
std::cerr << "Unexpected exception on EventLoop dequeue running." << std::endl;
}
}
std::cerr << "The main EventLoop dequeue stopped running unexpectedly!" << std::endl;
}
// Add an element to the queue
void enqueue(int timeout, Type element) {
std::chrono::time_point<std::chrono::system_clock> timenow = std::chrono::system_clock::now();
std::chrono::time_point<std::chrono::system_clock> newtime = timenow + std::chrono::milliseconds(timeout);
std::unique_lock<std::mutex> dequeuelock(_mutex);
_queue.insert(std::make_tuple(newtime, element));
_condition_variable.notify_one();
}
// Blocks until getting the first-element or throw EventLoopNoElements if it is shutting down
// Throws EventLoopNoElements when it is shutting down and there are not more elements
Type dequeue() {
typename EventLoopQueue::iterator queuebegin;
typename EventLoopQueue::iterator queueend;
std::chrono::time_point<std::chrono::system_clock> sleeptime;
// _mutex prevents multiple consumers from getting the same item or from missing the wake up
std::unique_lock<std::mutex> dequeuelock(_mutex);
do {
queuebegin = _queue.begin();
queueend = _queue.end();
if ( queuebegin == queueend ) {
if ( _shutdown ) {
throw EventLoopNoElements( "There are no more elements on the queue because it already shutdown." );
}
_condition_variable.wait( dequeuelock );
}
else {
if ( _shutdown ) {
if (_free_shutdown) {
break;
}
else {
throw EventLoopNoElements( "The queue is shutting down." );
}
}
std::chrono::time_point<std::chrono::system_clock> timenow = std::chrono::system_clock::now();
sleeptime = std::get<0>( *queuebegin );
if ( sleeptime <= timenow ) {
break;
}
_condition_variable.wait_until( dequeuelock, sleeptime );
}
} while ( true );
Type firstelement = std::get<1>( *queuebegin );
_queue.erase( queuebegin );
dequeuelock.unlock();
return firstelement;
}
};
Run Code Online (Sandbox Code Playgroud)
使用这些的示例程序:
std::string getTime() {
char buffer[20];
#if defined( WIN32 )
SYSTEMTIME wlocaltime;
GetLocalTime(&wlocaltime);
::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03d ", wlocaltime.wHour, wlocaltime.wMinute, wlocaltime.wSecond, wlocaltime.wMilliseconds);
#else
std::chrono::time_point< std::chrono::system_clock > now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto hours = std::chrono::duration_cast< std::chrono::hours >( duration );
duration -= hours;
auto minutes = std::chrono::duration_cast< std::chrono::minutes >( duration );
duration -= minutes;
auto seconds = std::chrono::duration_cast< std::chrono::seconds >( duration );
duration -= seconds;
auto milliseconds = std::chrono::duration_cast< std::chrono::milliseconds >( duration );
duration -= milliseconds;
time_t theTime = time( NULL );
struct tm* aTime = localtime( &theTime );
::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03ld ", aTime->tm_hour, aTime->tm_min, aTime->tm_sec, milliseconds.count());
#endif
return buffer;
}
Run Code Online (Sandbox Code Playgroud)
输出测试示例:
02:08:28.960 Creating EventLoop
02:08:28.960 Adding event element
02:08:29.960 Running task 1
02:08:30.961 Running task 2
02:08:31.961 Running task 3
02:08:33.961 Exiting after 10 seconds...
Run Code Online (Sandbox Code Playgroud)
最后,所呈现的事件循环就像一个时间管理器。对于时间管理器来说,更好的界面是不强制用户使用线程。这是一个例子:
// g++ -o test -Wall -Wextra -ggdb -g3 -pthread test.cpp && gdb --args ./test
// valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --verbose ./test
// procdump -accepteula -ma -e -f "" -x c:\ myexe.exe
int main(int argc, char* argv[]) {
std::cerr << getTime().c_str() << "Creating EventLoop" << std::endl;
EventLoop<std::function<void()>>* eventloop = new EventLoop<std::function<void()>>(true);
std::cerr << getTime().c_str() << "Adding event element" << std::endl;
eventloop->enqueue( 3000, []{ std::cerr << getTime().c_str() << "Running task 3" << std::endl; } );
eventloop->enqueue( 1000, []{ std::cerr << getTime().c_str() << "Running task 1" << std::endl; } );
eventloop->enqueue( 2000, []{ std::cerr << getTime().c_str() << "Running task 2" << std::endl; } );
std::this_thread::sleep_for( std::chrono::milliseconds(5000) );
delete eventloop;
std::cerr << getTime().c_str() << "Exiting after 10 seconds..." << std::endl;
return 0;
}
Run Code Online (Sandbox Code Playgroud)