Ger*_*ald 5 c++ reactor poco-libraries
所以我开始对实现大容量客户端/服务器系统的替代方案进行一些研究,我现在正在研究Poco的Reactor框架,因为我现在正在为我的应用程序框架使用Poco.
传入的数据包大小将非常小,所以我认为从读取客户端数据的角度来看它会正常工作.但是,基于客户端输入执行的操作将相对昂贵,并且可能需要卸载到另一个进程或甚至另一个服务器.并且发送回客户端的响应有时会相当大.所以很明显我无法阻止反应器线程发生.
所以我想如果我只是读取reactor事件处理程序中的数据然后将它传递给处理数据的另一个线程(池),它会更好.
我不太确定的是在操作完成后将响应发送回客户端的过程.
我找不到太多关于使用框架的最佳方法的信息.但我已经做了一些测试,看起来反应堆将在套接字可写时反复触发WritableNotification事件.那么最佳过程是排队需要在接收WritableNotification事件的对象中发送的数据,并在每次收到事件时发送小块?
更新:所以当我开始测试时,我惊恐地发现服务器应用程序运行在单个连接上的服务器CPU使用率上升到100%.但经过一番挖掘,我发现我做错了什么.我发现在创建服务处理程序时我不需要注册WritableNotification事件,我只需要在有数据发送时注册.然后,一旦发送了所有数据,我应该取消注册事件处理程序.这样,当没有任何东西要发送时,反应器不必一直反复调用事件处理程序.现在,即使有100个连接,我的CPU使用率也接近0.呼!
我编写了一个从 SocketConnector 复制的 ServerConnector 类,但不调用套接字的连接,因为套接字已经连接,如果在 TcpServerConnection 的 run() 函数中使用 ServiceHandler 启动反应器以获取通知,则 TcpServer 类将开始一个新线程。所以,我得到了reactor-partten的多线程,但我不知道这是不是最好的方法。
服务器连接器类
template <class ServiceHandler>
class ServerConnector
{
public:
explicit ServerConnector(StreamSocket& ss):
_pReactor(0),
_socket(ss)
/// Creates a ServerConnector, using the given Socket.
{
}
ServerConnector(StreamSocket& ss, SocketReactor& reactor):
_pReactor(0),
_socket(ss)
/// Creates an acceptor, using the given ServerSocket.
/// The ServerConnector registers itself with the given SocketReactor.
{
registerConnector(reactor);
onConnect();
}
virtual ~ServerConnector()
/// Destroys the ServerConnector.
{
unregisterConnector();
}
//
// this part is same with SocketConnector
//
private:
ServerConnector();
ServerConnector(const ServerConnector&);
ServerConnector& operator = (const ServerConnector&);
StreamSocket& _socket;
SocketReactor* _pReactor;
};
Run Code Online (Sandbox Code Playgroud)
Echo-Service是一个通用的ServiceHander
class EchoServiceHandler
{
public:
EchoServiceHandler(StreamSocket& socket, SocketReactor& reactor):
_socket(socket),
_reactor(reactor)
{
_reactor.addEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
_reactor.addEventHandler(_socket, Observer<EchoServiceHandler, ErrorNotification>(*this, &EchoServiceHandler::onError));
}
~EchoServiceHandler()
{
_reactor.removeEventHandler(_socket, Observer<EchoServiceHandler, ErrorNotification>(*this, &EchoServiceHandler::onError));
_reactor.removeEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
}
void onReadable(ReadableNotification* pNf)
{
pNf->release();
char buffer[4096];
try {
int n = _socket.receiveBytes(buffer, sizeof(buffer));
if (n > 0)
{
_socket.sendBytes(buffer, n);
} else
onError();
} catch( ... ) {
onError();
}
}
void onError(ErrorNotification* pNf)
{
pNf->release();
onError();
}
void onError()
{
_socket.shutdown();
_socket.close();
_reactor.stop();
delete this;
}
private:
StreamSocket _socket;
SocketReactor& _reactor;
};
Run Code Online (Sandbox Code Playgroud)
EchoReactorConnection 与 TcpServer 类一起工作以将reactor作为线程运行
class EchoReactorConnection: public TCPServerConnection
{
public:
EchoReactorConnection(const StreamSocket& s): TCPServerConnection(s)
{
}
void run()
{
StreamSocket& ss = socket();
SocketReactor reactor;
ServerConnector<EchoServiceHandler> sc(ss, reactor);
reactor.run();
std::cout << "exit EchoReactorConnection thread" << std::endl;
}
};
Run Code Online (Sandbox Code Playgroud)
cppunit 测试用例与 TCPServerTest::testMultiConnections 相同,但使用 EchoReactorConnection 进行多线程。
void TCPServerTest::testMultithreadReactor()
{
ServerSocket svs(0);
TCPServerParams* pParams = new TCPServerParams;
pParams->setMaxThreads(4);
pParams->setMaxQueued(4);
pParams->setThreadIdleTime(100);
TCPServer srv(new TCPServerConnectionFactoryImpl<EchoReactorConnection>(), svs, pParams);
srv.start();
assert (srv.currentConnections() == 0);
assert (srv.currentThreads() == 0);
assert (srv.queuedConnections() == 0);
assert (srv.totalConnections() == 0);
//
// same with TCPServerTest::testMultiConnections()
//
// ....
///
}
Run Code Online (Sandbox Code Playgroud)