精通
英语
和
开源
,
擅长
开发
与
培训
,
胸怀四海
第一信赖
大并发高效率通信服务器的实现是工作中经常要遇到的情况,我们锐英源研究过IOCP方面不少开源架构,比如spserver和网游服务器IOCP模型,
2010年11月份,QQ上网友:“美女重现人间”所在公司“重庆西希科技”要进行相关的大并发高效率通信服务器开发,经过了解后,选择我们锐英源
进行技术合作公司,我们锐英源考察过后,采用spserver开源项目做为开发平台,极大地缩小了西希科技的开发周期,并且降低了开发难度保证了产
品质量。西希科技应用spserver主要是3点:
1、线程内启动spserver;
2、类似文本聊天协议架构下的广播发送;
3、指定单用户发送,这点是修改spserver架构才实现的功能,下面会有此功能的使用演示;
下面是合作协议图片:
1、启动服务器
请先编译工程里的spserver工程,生成spserver.lib,来避免编译其它工程时出的:cannot open file "spserver.lib"这样的错误。
执行压缩包里的diatest工程对应的可执行文件,会启动一个对话框,在启动时,会创建个线程来启动spserver聊天iocp服务器,服务器监听
5555端口。
下面是启动界面的主窗口:
使用netstat可以看到端口监听的情况,使用命令netstat -a -p tcp可以看到监听上了5555端口。
在监听成功后,可以用telnet来模拟客户端,下面是telnet登录上的窗口:
下面是点击主窗口上“发包测试”后的窗口情况 :
上面的"shw test"文本就是单用户测试发包的结果。
为了实现上述功能,我们锐英源通过源代码理解建立了思路,下面先说源代码理解过程:
SP_Sid_t sid = { SP_Sid_t::ePushKey, SP_Sid_t::ePushSeq };
SP_Response * response = new SP_Response( sid );
response->addMessage( msg );
必须还要添加到
dispatcher.push( response );
return mEventArg->getResponseQueue()->push( response );
static BOOL addSend( SP_Session * session );
关键是用到了SP_IocpDispatcher,派发者
void SP_IocpEventHelper :: worker( void * arg )
{
SP_Session * session = (SP_Session*)arg;
SP_Handler * handler = session->getHandler();
SP_IocpSession_t * iocpSession = (SP_IocpSession_t*)session->getArg();
SP_IocpEventArg * eventArg = iocpSession->mEventArg;
SP_Response * response = new SP_Response( session->getSid() );
if( 0 != handler->handle( session->getRequest(), response ) ) {
session->setStatus( SP_Session::eWouldExit );
}
session->setRunning( 0 );
eventArg->getResponseQueue()->push( response );
}
改造服务器类,把event指针暴露出来
在得到上面结论后,形成了下面的修改思路
所有的添加代码都带有注释//by shw 20101105 向指定客户端发消息处理
1、暴露出事件参数,可以向响应队列里发响应。
影响文件spiocpserver.hpp,spiocpserver.cpp。这2个文件是spserver的主要文件,覆盖过后要重新编译spserver库。
2、修改ChatHandler,让直接可以发包
大家看上面代码可能会有所疑惑,请参考下面我们对spserver的总结来理解。
spserver的下载链接为
http://www.codeproject.com/KB/IP/IOCP_Server_Framework.aspx
在搜索引擎上查找发向使用者比较多
怎样测试?
源代码下载来后,解压,用VC6打开win32目录下的spserver.dsw文件,先编译spserver工程,再编译testiocpchat。
至于怎样写iocp通信服务器,可以参考testiocpchat工程,添加一些handler就可以了。
获取信息,回复信息。
防DDOS,一个IP多客户端。
SPServer 自带了一套用于压力测试的程序。
在 windows 平台是 testiocpecho.cpp 和 testiocpstress.cpp 。使用方法如下:
1)在一个 console 启动 testiocpecho.exe
E:\spserver-0.9.2\win32\testiocpecho\Debug>.\testiocpecho.exe
#1728 server type lf
#1728 Listen on port [3333]
#1728 Thread #2868 has been created to accept socket
#1728 [tp@unknown] create thread#4028
#1728 [tp@unknown] create thread#2492
#1728 [tp@unknown] create thread#1076
#1728 [tp@unknown] create thread#3204
2) 在另一个 console 启动 testiocpstress.exe
E:\spserver-0.9.2\win32\testiocpstress\Debug>.\testiocpstress.exe -c 100 -m 100
这个程序支持一些命令行参数,-c 用户指定模拟多少个 client ,-m 用于指定每个 client 发送多少条信息。
在一台有 512M 内存的 windows xp home edition 机器上,两个程序都在本机运行,可以稳定运行 5000 的并发连接。
如果上到 10K 的连接,在测试一段时间之后,会开始出现 10055 的错误。
限于目前没有更高配置的机器,没有办法做更大并发的测试。如果有人有兴趣,可以帮忙做一下测试。
===========================================================================
在 Unix/Linux 下是 testecho.cpp 和 teststress.cpp 。使用方法如下:
1) 启动 testecho
bash-2.05a$ ./testecho
testecho[15635]: Listen on port [3333]
testecho[15635]: [ex@work] Thread #1026 has been created for executor
testecho[15635]: [ex@act] Thread #2051 has been created for executor
2) 执行 teststress
bash-2.05a$ ./teststress -c 100 -m 1000
这个命令行指定模拟 100 个 client ,每个 client 发送 1000 个信息。
在 windows xp home edition sp2 上改了两个参数。其他版本的 windows 还没试过
SYSTEM\\CurrentControlSet\\Services\\Tcpip\\Parameters\\Winsock
MaxUserPort REG_DWORD 5000-65534(十进制)(默认值0x1388--十进制为5000)
SYSTEM\\CurrentControlSet\\Services\\Tcpip\\Parameters\\Winsock
TcpNumConnections 有效范围:0 - 0xfffffe
>>4.根据网友的反馈,在 SP_IocpDispatcher 中增加了一个 push 接口,以便 server 能更灵活地 push 信息给 client
在 SP_Dispatcher/SP_IocpDispatcher 中加了一个 push 方法。
具体的使用例子可以参考 testchat_d.cpp testiocpdispatcher.cpp 。
注意看 main 函数中的这一段
char buffer[ 256 ] = { 0 };
snprintf( buffer, sizeof( buffer ), "SYS : %d online\r\n", ++chatID );
SP_Message * msg = new SP_Message();
onlineSidList.copy( msg->getToList(), NULL );
msg->getMsg()->append( buffer );
SP_Sid_t sid = { SP_Sid_t::ePushKey, SP_Sid_t::ePushSeq };
SP_Response * response = new SP_Response( sid );
response->addMessage( msg );
dispatcher.push( response );
下面来看一个使用 spserver 实现的简单的 line echo server 。
在最简单的情况下,使用 spserver 实现一个 TCP server 需要实现两个类:SP_Handler 的子类 和 SP_HandlerFactory 的子类。
SP_Handler 的子类负责处理具体业务。
SP_HandlerFactory 的子类协助 spserver 为每一个连接创建一个 SP_Handler 子类实例。
1.SP_Handler 生命周期
SP_Handler 和 TCP 连接一对一,SP_Handler 的生存周期和 TCP 连接一样。
当 TCP 连接被接受之后,SP_Handler 被创建,当 TCP 连接断开之后,SP_Handler将被 destroy。
2.SP_Handler 函数说明
SP_Handler 有 5 个纯虚方法需要由子类来重载。这 5 个方法分别是:
start:当一个连接成功接受之后,将首先被调用。返回 0 表示继续,-1 表示结束连接。
handle:当一个请求包接收完之后,将被调用。返回 0 表示继续,-1 表示结束连接。
error:当在一个连接上读取或者发送数据出错时,将被调用。error 之后,连接将被关闭。
timeout:当一个连接在约定的时间内没有发生可读或者可写事件,将被调用。timeout 之后,连接将被关闭。
close:当一个 TCP 连接被关闭时,无论是正常关闭,还是因为 error/timeout 而关闭。
3.SP_Handler 函数调用时机
当需要调用 SP_Handler 的 start/handle/error/timeout 方法时,相关的参数将被放入队列,然后由线程池来负责执行 SP_Handler 对应的方法。因此在 start/handle/error/timeout 中可以使用同步操作来编程,可以直接使用阻塞型 I/O 。
在发生 error 和 timeout 事件之后,close 紧跟着这两个方法之后被调用。
如果是程序正常指示结束连接,那么在主线程中直接调用 close 方法。
4.高级功能--MsgDecoder
这个 line echo server 比起常见的 echo server 有一点不同:只有在读到一行时才进行 echo。
这个功能是通过一个称为 MsgDecoder 的接口来实现的。不同的 TCP server 在应用层的传输格式上各不相同。
比如在 SMTP/POP 这一类的协议中,大部分命令是使用 CRLF 作为分隔符的。而在 HTTP 中是使用 Header + Body 的形式。
为了适应不同的 TCP server,在 spserver 中有一个 MsgDecoder 接口,用来处理这些应用层的协议。
比如在这个 line echo server 中,把传输协议定义为:只有读到一行时将进行 echo 。
那么相应地就要实现一个 SP_LineMsgDecoder ,这个 LineMsgDecoder 负责判断目前的输入缓冲区中是否已经有完整的一行。
MsgDecoder 的接口如下:
decode 方法对 inBuffer 里面的数据进行检查,看是否符合特定的要求。如果已经符合要求,那么返回 eOK ;如果还不满足要求,那么返回 eMoreData。比如 LineMsgDecoder 的 decode 方法的实现为:
spserver 默认提供了几个 MsgDecoder 的实现:
SP_DefaultMsgDecoder :它的 decode 总是返回 eOK ,即只要有输入就当作是符合要求了。
如果应用不设置 SP_Request->setMsgDecoder 的话,默认使用这个。
SP_LineMsgDecoder : 检查到有一行的时候,返回 eOK ,按行读取输入。
SP_DotTermMsgDecoder :检查到输入中包含了特定的 <CRLF>.<CRLF> 时,返回 eOK。
具体的使用例子可以参考示例:testsmtp 。
5.高级功能--实现聊天室
spserver 还提供了一个广播消息的功能。使用消息广播功能可以方便地实现类似聊天室的功能。具体的实现可以参考示例:testchat 。
先参考http协议的实现,写出来一个文本协议,再考虑加密协议的实现。
SP_HttpRequestDecoder
先向解析器添加数据
SP_HttpMsgParser类型成员返回OK时表示完成解析。
int len = mParser->append( inBuffer->getBuffer(), inBuffer->getSize() );
inBuffer->erase( len );
return mParser->isCompleted() ? eOK : eMoreData;
int SP_HttpHandlerAdapter :: start( SP_Request * request, SP_Response * response )
{
request->setMsgDecoder( new SP_HttpRequestDecoder() );//在请求发起时设置解析器
//如果想控制不让某个IP连接上,通过判断reqeust->getClientIP()返回的IP是不是在黑名单里,如果是在,返回-1,则就不会建立连接了。
return 0;
}
int SP_HttpHandlerAdapter :: handle( SP_Request * request, SP_Response * response )
{
SP_HttpRequestDecoder * decoder = ( SP_HttpRequestDecoder * ) request->getMsgDecoder();//获取请求的解析器
SP_HttpRequest * httpRequest = ( SP_HttpRequest * ) decoder->getMsg();//获取请求结果,也就是协议里客户端发送的内容
httpRequest->setClinetIP( request->getClientIP() );//设置客户端IP。
SP_HttpResponse * httpResponse = new SP_HttpResponse();//分配一个回复
httpResponse->setVersion( httpRequest->getVersion() );
mHandler->handle( httpRequest, httpResponse );//内部处理请求,算是具体协议的实现
SP_Buffer * reply = response->getReply()->getMsg();
char buffer[ 512 ] = { 0 };
snprintf( buffer, sizeof( buffer ), "%s %i %s\r\n", httpResponse->getVersion(),
httpResponse->getStatusCode(), httpResponse->getReasonPhrase() );
reply->append( buffer );
// check keep alive header
if( httpRequest->isKeepAlive() ) {
if( NULL == httpResponse->getHeaderValue( SP_HttpMessage::HEADER_CONNECTION ) ) {
httpResponse->addHeader( SP_HttpMessage::HEADER_CONNECTION, "Keep-Alive" );
}
}
// check Content-Length header
httpResponse->removeHeader( SP_HttpMessage::HEADER_CONTENT_LENGTH );
if( httpResponse->getContentLength() > 0 ) {
snprintf( buffer, sizeof( buffer ), "%d", httpResponse->getContentLength() );
httpResponse->addHeader( SP_HttpMessage::HEADER_CONTENT_LENGTH, buffer );
}
// check date header
httpResponse->removeHeader( SP_HttpMessage::HEADER_DATE );
time_t tTime = time( NULL );
struct tm tmTime;
gmtime_r( &tTime, &tmTime );
strftime( buffer, sizeof( buffer ), "%a, %d %b %Y %H:%M:%S %Z", &tmTime );
httpResponse->addHeader( SP_HttpMessage::HEADER_DATE, buffer );
// check Content-Type header
if( NULL == httpResponse->getHeaderValue( SP_HttpMessage::HEADER_CONTENT_TYPE ) ) {
httpResponse->addHeader( SP_HttpMessage::HEADER_CONTENT_TYPE,
"text/html; charset=ISO-8859-1" );
}
// check Server header
httpResponse->removeHeader( SP_HttpMessage::HEADER_SERVER );
httpResponse->addHeader( SP_HttpMessage::HEADER_SERVER, "sphttp/spserver" );
for( int i = 0; i < httpResponse->getHeaderCount(); i++ ) {
snprintf( buffer, sizeof( buffer ), "%s: %s\r\n",
httpResponse->getHeaderName( i ), httpResponse->getHeaderValue( i ) );
reply->append( buffer );
}
reply->append( "\r\n" );
char keepAlive[ 32 ] = { 0 };
if( NULL != httpResponse->getHeaderValue( SP_HttpMessage::HEADER_CONNECTION ) ) {
strncpy( keepAlive, httpResponse->getHeaderValue(
SP_HttpMessage::HEADER_CONNECTION ), sizeof( keepAlive ) - 1 );
}
if( NULL != httpResponse->getContent() ) {
response->getReply()->getFollowBlockList()->append(
new SP_HttpResponseMsgBlock( httpResponse ) );
} else {
delete httpResponse;
}
request->setMsgDecoder( new SP_HttpRequestDecoder() );//再次设置解析器
return 0 == strcasecmp( keepAlive, "Keep-Alive" ) ? 0 : -1;//如果不是保持连接,则返回-1关闭。
}
void SP_HttpHandlerAdapter :: error( SP_Response * response )
{
mHandler->error();
}
void SP_HttpHandlerAdapter :: timeout( SP_Response * response )
{
mHandler->timeout();
}
void SP_HttpHandlerAdapter :: close()
{
}
源代码结束
SP_HttpHandler里没有handler函数,它的实现是在派生类里实现的。
一个接收连接线程:
sp_thread_result_t SP_THREAD_CALL SP_IocpServer :: acceptThread( void * arg )
{
DWORD recvBytes = 0;
SP_IocpAcceptArg_t * acceptArg = (SP_IocpAcceptArg_t*)arg;
for( ; ; ) {
acceptArg->mClientSocket = (HANDLE)WSASocket( AF_INET,
SOCK_STREAM,? IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED );
if( INVALID_SOCKET == (int)acceptArg->mClientSocket ) {
sp_syslog( LOG_ERR, "WSASocket fail, errno %d",
WSAGetLastError() );
Sleep( 50 );
continue;
}
SP_IOUtils::setNonblock( (int)acceptArg->mClientSocket );
memset( &( acceptArg->mOverlapped ), 0, sizeof( OVERLAPPED ) );
BOOL ret = AcceptEx( (SOCKET)acceptArg->mListenSocket, (SOCKET)acceptArg->mClientSocket,
acceptArg->mBuffer, 0, sizeof(struct sockaddr_in) + 16, sizeof(struct sockaddr_in) + 16,
&recvBytes, &( acceptArg->mOverlapped ) );
int lastError = WSAGetLastError();
if( FALSE == ret && (ERROR_IO_PENDING != lastError) ) {
sp_syslog( LOG_ERR, "AcceptEx() fail, errno %d", lastError );
closesocket( (int)acceptArg->mClientSocket );
if( WSAENOBUFS == lastError ) Sleep( 50 );
} else {
//等待另一个线程处理完连接
WaitForSingleObject( acceptArg->mAcceptEvent, INFINITE );
ResetEvent( acceptArg->mAcceptEvent );
}
}
return 0;
}
一个查询完成端口的线程(如果使用runForever就是调用线程,用run则另起线程):
int SP_IocpServer :: start()
{
...
SP_Executor actExecutor( 1, "act" );
SP_Executor workerExecutor( mMaxThreads, "work" );
SP_CompletionHandler * completionHandler = mHandlerFactory->createCompletionHandler();
/* Start the event loop. */
while( 0 == mIsShutdown ) {
SP_IocpEventCallback::eventLoop( &eventArg, &acceptArg );
for( ; NULL != eventArg.getInputResultQueue()->top(); ) {
SP_Task * task = (SP_Task*)eventArg.getInputResultQueue()->pop();
workerExecutor.execute( task );
}
for( ; NULL != eventArg.getOutputResultQueue()->top(); ) {
SP_Message * msg = (SP_Message*)eventArg.getOutputResultQueue()->pop();
void ** arg = ( void** )malloc( sizeof( void * ) * 2 );
arg[ 0 ] = (void*)completionHandler;
arg[ 1 ] = (void*)msg;
actExecutor.execute( outputCompleted, arg );
}
}
delete completionHandler;
sp_syslog( LOG_NOTICE, "Server is shutdown." );
sp_close( listenFD );
}
return ret;
}
另外还有两个线程池用于执行每个请求:
SP_Executor actExecutor( 1, "act" );
SP_Executor workerExecutor( mMaxThreads, "work" );
基本上框架就很明显了.
在 [url=http://code.google.com/p/spserver/]SPServer 中实现了 HSHA 和 LF 两种线程池。
目前的实现还是比较可读的,这两种线程池最主要的处理逻辑各自都被集中到了一个函数中。
先来看看 HSHA 的核心实现代码
http://spserver.googlecode.com/svn/trunk/spserver.cpp
int SP_Server :: start()
{
......
SP_Executor workerExecutor( mMaxThreads, "work" );
SP_Executor actExecutor( 1, "act" );
/* Start the event loop. */
while( 0 == mIsShutdown ) {
event_base_loop( eventArg.getEventBase(), EVLOOP_ONCE );
for( ; NULL != eventArg.getInputResultQueue()->top(); ) {
SP_Task * task = (SP_Task*)eventArg.getInputResultQueue()->pop();
workerExecutor.execute( task );
}
for( ; NULL != eventArg.getOutputResultQueue()->top(); ) {
SP_Message * msg = (SP_Message*)eventArg.getOutputResultQueue()->pop();
......
actExecutor.execute( outputCompleted, arg );
}
}
......
}
一些细节都被去掉了。从这段代码可以看到,HSHA 的处理流程是:
1.运行 start 函数的线程称为 event_loop 线程,负责 recv/send 。
所有的 recv/send 都在 event_base_loop 这个函数调用上完成的。
这个层就是属于异步层。
2. event_base_loop 在 recv 的时候,会调用 MsgDecoder.decode 函数,如果 decode 返回 OK ,说明完整地读入数据了,那么就把对应的数据放入 eventArg.mInputResultQueue 里面。
在 send 的时候,如果把一个 Message 完整地发送了, 那么就把这个 Message 放入 eventArg.mOutputResultQueue。 这两个就是队列,队列里面保存的数据一般称为完成事件。
3.workerExecutor 和 actExecutor 就是同步层。
由于完成事件的处理可能会涉及很复杂的处理,可能会使用到数据库或者其他, 因此不能直接使用 event_loop 线程,而是使用线程池。
这个就是同步层。
再来看 LF 的核心代码
http://spserver.googlecode.com/svn/trunk/splfserver.cpp
int SP_LFServer :: run()
{
......
mThreadPool = new SP_ThreadPool( mMaxThreads );
for( int i = 0; i < mMaxThreads; i++ ) {
mThreadPool->dispatch( lfHandler, this );
}
......
}
void SP_LFServer :: lfHandler( void * arg )
{
SP_LFServer * server = (SP_LFServer*)arg;
for( ; 0 == server->mIsShutdown; ) {
/* follower begin */
server->handleOneEvent();
}
}
void SP_LFServer :: handleOneEvent()
{
SP_Task * task = NULL;
SP_Message * msg = NULL;
/* follower wait */
pthread_mutex_lock( &mMutex );
/* follower end */
/* leader begin */
for( ; 0 == mIsShutdown && NULL == task && NULL == msg; ) {
if( mEventArg->getInputResultQueue()->getLength() > 0 ) {
task = (SP_Task*)mEventArg->getInputResultQueue()->pop();
} else if( mEventArg->getOutputResultQueue()->getLength() > 0 ) {
msg = (SP_Message*)mEventArg->getOutputResultQueue()->pop();
}
if( NULL == task && NULL == msg ) {
event_base_loop( mEventArg->getEventBase(), EVLOOP_ONCE );
}
}
/* leader end */
pthread_mutex_unlock( &mMutex );
/* worker begin */
if( NULL != task ) task->run();
if( NULL != msg ) mCompletionHandler->completionMessage( msg );
/* worker end */
}
在 run 函数中,启动线程池中的多个线程运行 lfHandler ,由 lfHandler 不断地运行 handleOneEvent 。
线程的角色转变在上面的注释中可以很清楚地看到。
由于这里的实现比较简单,LF 线程池的线程都是预先创建的, 并且没有实现线程池之外的线程可以加入的功能, 所以在 leader 切换为 worker,并且提升一个 follower 为 leader 的时候, 只需要用一个 mutex 就可以解决了。