锐英源软件
第一信赖

精通

英语

开源

擅长

开发

培训

胸怀四海 

第一信赖

当前位置:锐英源 / 学员作品 / 网授学员"美女重现人间"大型通信服务器开发
服务方向
软件开发
办公财务MIS
股票
设备监控
网页信息采集及控制
多媒体
软件开发培训
Java 安卓移动开发
Java Web开发
HTML5培训
iOS培训
网站前端开发
VC++
C++游戏开发培训
C#软件
C语言(Linux)
ASP.NET网站开发(C#)
C#软件+ASP.NET网站
SOCKET网络通信开发
COMOLE和ActiveX开发
C++(Linux)
汇编和破解
驱动开发
SkinMagicVC++换肤
流媒体开发
MicroStation二次开发
计算机英语翻译
联系方式
固话:0371-63888850
手机:138-0381-0136
Q Q:396806883
微信:ryysoft

简介

大并发高效率通信服务器的实现是工作中经常要遇到的情况,我们锐英源研究过IOCP方面不少开源架构,比如spserver和网游服务器IOCP模型,
2010年11月份,QQ上网友:“美女重现人间”所在公司“重庆西希科技”要进行相关的大并发高效率通信服务器开发,经过了解后,选择我们锐英源
进行技术合作公司,我们锐英源考察过后,采用spserver开源项目做为开发平台,极大地缩小了西希科技的开发周期,并且降低了开发难度保证了产
品质量。西希科技应用spserver主要是3点:
   1、线程内启动spserver;
   2、类似文本聊天协议架构下的广播发送;
   3、指定单用户发送,这点是修改spserver架构才实现的功能,下面会有此功能的使用演示;

下面是合作协议图片:

合作协议

修改后spserver及指定单用户发送源代码下载

单用户发送使用演示

1、启动服务器
请先编译工程里的spserver工程,生成spserver.lib,来避免编译其它工程时出的:cannot open file "spserver.lib"这样的错误。
   执行压缩包里的diatest工程对应的可执行文件,会启动一个对话框,在启动时,会创建个线程来启动spserver聊天iocp服务器,服务器监听
5555端口。

下面是启动界面的主窗口:
启动界面的主窗口
使用netstat可以看到端口监听的情况,使用命令netstat -a -p tcp可以看到监听上了5555端口。
在监听成功后,可以用telnet来模拟客户端,下面是telnet登录上的窗口:
telnet登录
下面是点击主窗口上“发包测试”后的窗口情况 :
发包测试
上面的"shw test"文本就是单用户测试发包的结果。
为了实现上述功能,我们锐英源通过源代码理解建立了思路,下面先说源代码理解过程:

spserver架构修改线索

SP_Sid_t sid = { SP_Sid_t::ePushKey, SP_Sid_t::ePushSeq };
SP_Response * response = new SP_Response( sid );
response->addMessage( msg );
必须还要添加到
dispatcher.push( response );

return mEventArg->getResponseQueue()->push( response );

static BOOL addSend( SP_Session * session );

关键是用到了SP_IocpDispatcher,派发者

void SP_IocpEventHelper :: worker( void * arg )
{
SP_Session * session = (SP_Session*)arg;
SP_Handler * handler = session->getHandler();
SP_IocpSession_t * iocpSession = (SP_IocpSession_t*)session->getArg();
SP_IocpEventArg * eventArg = iocpSession->mEventArg;

SP_Response * response = new SP_Response( session->getSid() );
if( 0 != handler->handle( session->getRequest(), response ) ) {
session->setStatus( SP_Session::eWouldExit );
}

session->setRunning( 0 );

eventArg->getResponseQueue()->push( response );
}

改造服务器类,把event指针暴露出来

在得到上面结论后,形成了下面的修改思路

指定用户发送思路

所有的添加代码都带有注释//by shw 20101105 向指定客户端发消息处理
1、暴露出事件参数,可以向响应队列里发响应。
影响文件spiocpserver.hpp,spiocpserver.cpp。这2个文件是spserver的主要文件,覆盖过后要重新编译spserver库。
2、修改ChatHandler,让直接可以发包

大家看上面代码可能会有所疑惑,请参考下面我们对spserver的总结来理解。

spserver基本说明

spserver的下载链接为
http://www.codeproject.com/KB/IP/IOCP_Server_Framework.aspx
在搜索引擎上查找发向使用者比较多

怎样测试?

  1. 编译

源代码下载来后,解压,用VC6打开win32目录下的spserver.dsw文件,先编译spserver工程,再编译testiocpchat。

  1. 执行testiocpchat。这个程序是一个群发聊天软件,任何一个客户端说的话,所有其它客户端都会收到。发送quit是退出。
  2. 模拟大量客户端,连接服务器的5555端口。每200毫秒发一句话,看并发量怎样。
  3. 如果不知道客户端怎样编写,请使用telnet ip 5555命令来模拟聊天过程就可以了。

至于怎样写iocp通信服务器,可以参考testiocpchat工程,添加一些handler就可以了。

获取信息,回复信息。
防DDOS,一个IP多客户端。

自带的测试工具使用说明

SPServer 自带了一套用于压力测试的程序。
在 windows 平台是 testiocpecho.cpp 和 testiocpstress.cpp 。使用方法如下:
1)在一个 console 启动 testiocpecho.exe
E:\spserver-0.9.2\win32\testiocpecho\Debug>.\testiocpecho.exe
#1728 server type lf
#1728 Listen on port [3333]
#1728 Thread #2868 has been created to accept socket
#1728 [tp@unknown] create thread#4028
#1728 [tp@unknown] create thread#2492
#1728 [tp@unknown] create thread#1076
#1728 [tp@unknown] create thread#3204
2) 在另一个 console 启动 testiocpstress.exe
E:\spserver-0.9.2\win32\testiocpstress\Debug>.\testiocpstress.exe -c 100 -m 100
这个程序支持一些命令行参数,-c 用户指定模拟多少个 client ,-m 用于指定每个 client 发送多少条信息。
在一台有 512M 内存的 windows xp home edition 机器上,两个程序都在本机运行,可以稳定运行 5000 的并发连接。
如果上到 10K 的连接,在测试一段时间之后,会开始出现 10055 的错误。
限于目前没有更高配置的机器,没有办法做更大并发的测试。如果有人有兴趣,可以帮忙做一下测试。
===========================================================================
在 Unix/Linux 下是 testecho.cpp 和 teststress.cpp 。使用方法如下:
1) 启动 testecho
bash-2.05a$ ./testecho
testecho[15635]: Listen on port [3333]
testecho[15635]: [ex@work] Thread #1026 has been created for executor
testecho[15635]: [ex@act] Thread #2051 has been created for executor
2) 执行 teststress
bash-2.05a$ ./teststress  -c 100 -m 1000
这个命令行指定模拟 100 个 client ,每个 client 发送 1000 个信息。

修改客户端发起限制

在 windows xp home edition sp2 上改了两个参数。其他版本的 windows 还没试过
SYSTEM\\CurrentControlSet\\Services\\Tcpip\\Parameters\\Winsock
MaxUserPort   REG_DWORD   5000-65534(十进制)(默认值0x1388--十进制为5000)
SYSTEM\\CurrentControlSet\\Services\\Tcpip\\Parameters\\Winsock
TcpNumConnections 有效范围:0 - 0xfffffe

怎样向指定客户端发消息(注意带下划线的代码)

>>4.根据网友的反馈,在 SP_IocpDispatcher 中增加了一个 push 接口,以便 server 能更灵活地 push 信息给 client
在 SP_Dispatcher/SP_IocpDispatcher 中加了一个 push 方法。
具体的使用例子可以参考 testchat_d.cpp testiocpdispatcher.cpp 。
注意看 main 函数中的这一段
char buffer[ 256 ] = { 0 };
snprintf( buffer, sizeof( buffer ), "SYS : %d online\r\n", ++chatID );
SP_Message * msg = new SP_Message();
onlineSidList.copy( msg->getToList(), NULL );
msg->getMsg()->append( buffer );
SP_Sid_t sid = { SP_Sid_t::ePushKey, SP_Sid_t::ePushSeq };
SP_Response * response = new SP_Response( sid );
response->addMessage( msg );
dispatcher.push( response );

代码例子及分析

下面来看一个使用 spserver 实现的简单的 line echo server 。

  1. class SP_EchoHandler : public SP_Handler {   
  2. public:   
  3.   SP_EchoHandler(){}   
  4.   virtual ~SP_EchoHandler(){}   
  5.   
  6.   // return -1 : terminate session, 0 : continue   
  7.   virtual int start( SP_Request * request, SP_Response * response ) {   
  8.     request->setMsgDecoder( new SP_LineMsgDecoder() );   
  9.     response->getReply()->getMsg()->append(   
  10.       "Welcome to line echo server, enter 'quit' to quit.\r\n" );   
  11.   
  12.     return 0;      
  13.   }        
  14.   
  15.   // return -1 : terminate session, 0 : continue   
  16.   virtual int handle( SP_Request * request, SP_Response * response ) {   
  17.     SP_LineMsgDecoder * decoder = (SP_LineMsgDecoder*)request->getMsgDecoder();   
  18.   
  19.     if( 0 != strcasecmp( (char*)decoder->getMsg(), "quit" ) ) {   
  20.       response->getReply()->getMsg()->append( (char*)decoder->getMsg() );   
  21.       response->getReply()->getMsg()->append( "\r\n" );   
  22.       return 0;            
  23.     } else {       
  24.       response->getReply()->getMsg()->append( "Byebye\r\n" );   
  25.       return -1;           
  26.     }              
  27.   }        
  28.   virtual void error( SP_Response * response ) {}   
  29.   
  30.   virtual void timeout( SP_Response * response ) {}   
  31.   
  32.   virtual void close() {}   
  33. };   
  34.   
  35. class SP_EchoHandlerFactory : public SP_HandlerFactory {   
  36. public:   
  37.   SP_EchoHandlerFactory() {}   
  38.   virtual ~SP_EchoHandlerFactory() {}   
  39.   
  40.   virtual SP_Handler * create() const {   
  41.     return new SP_EchoHandler();   
  42.   }   
  43. };   
  44.   
  45. int main( int argc, char * argv[] )   
  46. {   
  47.   int port = 3333;   
  48.   
  49.   SP_Server server( "", port, new SP_EchoHandlerFactory() );   
  50.   server.runForever();   
  51.   
  52.   return 0;   
  53. }  



在最简单的情况下,使用 spserver 实现一个 TCP server 需要实现两个类:SP_Handler 的子类 和 SP_HandlerFactory 的子类。
SP_Handler 的子类负责处理具体业务。
SP_HandlerFactory 的子类协助 spserver 为每一个连接创建一个 SP_Handler 子类实例。

1.SP_Handler 生命周期
SP_Handler 和 TCP 连接一对一,SP_Handler 的生存周期和 TCP 连接一样。
当 TCP 连接被接受之后,SP_Handler 被创建,当 TCP 连接断开之后,SP_Handler将被 destroy。

2.SP_Handler 函数说明
SP_Handler 有 5 个纯虚方法需要由子类来重载。这 5 个方法分别是:
start:当一个连接成功接受之后,将首先被调用。返回 0 表示继续,-1 表示结束连接。
handle:当一个请求包接收完之后,将被调用。返回 0 表示继续,-1 表示结束连接。
error:当在一个连接上读取或者发送数据出错时,将被调用。error 之后,连接将被关闭。
timeout:当一个连接在约定的时间内没有发生可读或者可写事件,将被调用。timeout 之后,连接将被关闭。
close:当一个 TCP 连接被关闭时,无论是正常关闭,还是因为 error/timeout 而关闭。

3.SP_Handler 函数调用时机
当需要调用 SP_Handler 的 start/handle/error/timeout 方法时,相关的参数将被放入队列,然后由线程池来负责执行 SP_Handler 对应的方法。因此在 start/handle/error/timeout 中可以使用同步操作来编程,可以直接使用阻塞型 I/O 。
在发生 error 和 timeout 事件之后,close 紧跟着这两个方法之后被调用。
如果是程序正常指示结束连接,那么在主线程中直接调用 close 方法。

4.高级功能--MsgDecoder
这个 line echo server 比起常见的 echo server 有一点不同:只有在读到一行时才进行 echo。
这个功能是通过一个称为 MsgDecoder 的接口来实现的。不同的 TCP server 在应用层的传输格式上各不相同。
比如在 SMTP/POP 这一类的协议中,大部分命令是使用 CRLF 作为分隔符的。而在 HTTP 中是使用 Header + Body 的形式。
为了适应不同的 TCP server,在 spserver 中有一个 MsgDecoder 接口,用来处理这些应用层的协议。
比如在这个 line echo server 中,把传输协议定义为:只有读到一行时将进行 echo 。
那么相应地就要实现一个 SP_LineMsgDecoder ,这个 LineMsgDecoder 负责判断目前的输入缓冲区中是否已经有完整的一行。

MsgDecoder 的接口如下:

  1. class SP_MsgDecoder {   
  2. public:   
  3.   virtual ~SP_MsgDecoder();   
  4.   
  5.   enum { eOK, eMoreData };   
  6.   virtual int decode( SP_Buffer * inBuffer ) = 0;   
  7. };  



decode 方法对 inBuffer 里面的数据进行检查,看是否符合特定的要求。如果已经符合要求,那么返回 eOK ;如果还不满足要求,那么返回 eMoreData。比如 LineMsgDecoder 的 decode 方法的实现为:

  1. int SP_LineMsgDecoder :: decode( SP_Buffer * inBuffer )   
  2. {                  
  3.   if( NULL != mLine ) free( mLine );   
  4.   mLine = inBuffer->getLine();   
  5.            
  6.   return NULL == mLine ? eMoreData : eOK;   
  7. }     



spserver 默认提供了几个 MsgDecoder 的实现:
SP_DefaultMsgDecoder :它的 decode 总是返回 eOK ,即只要有输入就当作是符合要求了。
    如果应用不设置 SP_Request->setMsgDecoder 的话,默认使用这个。
SP_LineMsgDecoder : 检查到有一行的时候,返回 eOK ,按行读取输入。
SP_DotTermMsgDecoder :检查到输入中包含了特定的 <CRLF>.<CRLF> 时,返回 eOK。

具体的使用例子可以参考示例:testsmtp 。

5.高级功能--实现聊天室
spserver 还提供了一个广播消息的功能。使用消息广播功能可以方便地实现类似聊天室的功能。具体的实现可以参考示例:testchat 。

获取信息回复信息多个例子分析

先参考http协议的实现,写出来一个文本协议,再考虑加密协议的实现。

HTTP协议

SP_HttpRequestDecoder
先向解析器添加数据
SP_HttpMsgParser类型成员返回OK时表示完成解析。
int len = mParser->append( inBuffer->getBuffer(), inBuffer->getSize() );

inBuffer->erase( len );

return mParser->isCompleted() ? eOK : eMoreData;

int SP_HttpHandlerAdapter :: start( SP_Request * request, SP_Response * response )
{
request->setMsgDecoder( new SP_HttpRequestDecoder() );//在请求发起时设置解析器
//如果想控制不让某个IP连接上,通过判断reqeust->getClientIP()返回的IP是不是在黑名单里,如果是在,返回-1,则就不会建立连接了。
return 0;
}

int SP_HttpHandlerAdapter :: handle( SP_Request * request, SP_Response * response )
{
SP_HttpRequestDecoder * decoder = ( SP_HttpRequestDecoder * ) request->getMsgDecoder();//获取请求的解析器
SP_HttpRequest * httpRequest = ( SP_HttpRequest * ) decoder->getMsg();//获取请求结果,也就是协议里客户端发送的内容

httpRequest->setClinetIP( request->getClientIP() );//设置客户端IP。

SP_HttpResponse * httpResponse = new SP_HttpResponse();//分配一个回复
httpResponse->setVersion( httpRequest->getVersion() );

mHandler->handle( httpRequest, httpResponse );//内部处理请求,算是具体协议的实现

SP_Buffer * reply = response->getReply()->getMsg();

char buffer[ 512 ] = { 0 };
snprintf( buffer, sizeof( buffer ), "%s %i %s\r\n", httpResponse->getVersion(),
httpResponse->getStatusCode(), httpResponse->getReasonPhrase() );
reply->append( buffer );

// check keep alive header
if( httpRequest->isKeepAlive() ) {
if( NULL == httpResponse->getHeaderValue( SP_HttpMessage::HEADER_CONNECTION ) ) {
httpResponse->addHeader( SP_HttpMessage::HEADER_CONNECTION, "Keep-Alive" );
}
}

// check Content-Length header
httpResponse->removeHeader( SP_HttpMessage::HEADER_CONTENT_LENGTH );
if( httpResponse->getContentLength() > 0 ) {
snprintf( buffer, sizeof( buffer ), "%d", httpResponse->getContentLength() );
httpResponse->addHeader( SP_HttpMessage::HEADER_CONTENT_LENGTH, buffer );
}

// check date header
httpResponse->removeHeader( SP_HttpMessage::HEADER_DATE );
time_t tTime = time( NULL );
struct tm tmTime;
gmtime_r( &tTime, &tmTime );
strftime( buffer, sizeof( buffer ), "%a, %d %b %Y %H:%M:%S %Z", &tmTime );
httpResponse->addHeader( SP_HttpMessage::HEADER_DATE, buffer );

// check Content-Type header
if( NULL == httpResponse->getHeaderValue( SP_HttpMessage::HEADER_CONTENT_TYPE ) ) {
httpResponse->addHeader( SP_HttpMessage::HEADER_CONTENT_TYPE,
"text/html; charset=ISO-8859-1" );
}

// check Server header
httpResponse->removeHeader( SP_HttpMessage::HEADER_SERVER );
httpResponse->addHeader( SP_HttpMessage::HEADER_SERVER, "sphttp/spserver" );

for( int i = 0; i < httpResponse->getHeaderCount(); i++ ) {
snprintf( buffer, sizeof( buffer ), "%s: %s\r\n",
httpResponse->getHeaderName( i ), httpResponse->getHeaderValue( i ) );
reply->append( buffer );
}

reply->append( "\r\n" );

char keepAlive[ 32 ] = { 0 };
if( NULL != httpResponse->getHeaderValue( SP_HttpMessage::HEADER_CONNECTION ) ) {
strncpy( keepAlive, httpResponse->getHeaderValue(
SP_HttpMessage::HEADER_CONNECTION ), sizeof( keepAlive ) - 1 );
}

if( NULL != httpResponse->getContent() ) {
response->getReply()->getFollowBlockList()->append(
new SP_HttpResponseMsgBlock( httpResponse ) );
} else {
delete httpResponse;
}

request->setMsgDecoder( new SP_HttpRequestDecoder() );//再次设置解析器

return 0 == strcasecmp( keepAlive, "Keep-Alive" ) ? 0 : -1;//如果不是保持连接,则返回-1关闭。
}

void SP_HttpHandlerAdapter :: error( SP_Response * response )
{
mHandler->error();
}

void SP_HttpHandlerAdapter :: timeout( SP_Response * response )
{
mHandler->timeout();
}

void SP_HttpHandlerAdapter :: close()
{
}

源代码结束

SP_HttpHandler里没有handler函数,它的实现是在派生类里实现的。

setReqQueueSize

线程理解

一个接收连接线程:
sp_thread_result_t SP_THREAD_CALL SP_IocpServer :: acceptThread( void * arg )
{
DWORD recvBytes = 0;

SP_IocpAcceptArg_t * acceptArg = (SP_IocpAcceptArg_t*)arg;

for( ; ; ) {
acceptArg->mClientSocket = (HANDLE)WSASocket( AF_INET,
          SOCK_STREAM,? IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED );
if( INVALID_SOCKET == (int)acceptArg->mClientSocket ) {
sp_syslog( LOG_ERR, "WSASocket fail, errno %d",
                  WSAGetLastError() );
Sleep( 50 );
continue;
}

SP_IOUtils::setNonblock( (int)acceptArg->mClientSocket );
memset( &( acceptArg->mOverlapped ), 0, sizeof( OVERLAPPED ) );

BOOL ret = AcceptEx( (SOCKET)acceptArg->mListenSocket, (SOCKET)acceptArg->mClientSocket,
acceptArg->mBuffer, 0, sizeof(struct sockaddr_in) + 16, sizeof(struct sockaddr_in) + 16,
&recvBytes, &( acceptArg->mOverlapped ) );

int lastError = WSAGetLastError();
if( FALSE == ret && (ERROR_IO_PENDING != lastError) ) {
sp_syslog( LOG_ERR, "AcceptEx() fail, errno %d", lastError );
closesocket( (int)acceptArg->mClientSocket );
if( WSAENOBUFS == lastError ) Sleep( 50 );
} else {
//等待另一个线程处理完连接
WaitForSingleObject( acceptArg->mAcceptEvent, INFINITE );
ResetEvent( acceptArg->mAcceptEvent );
}
}
return 0;
}

一个查询完成端口的线程(如果使用runForever就是调用线程,用run则另起线程):
int SP_IocpServer :: start()
{
...

SP_Executor actExecutor( 1, "act" );
SP_Executor workerExecutor( mMaxThreads, "work" );
SP_CompletionHandler * completionHandler = mHandlerFactory->createCompletionHandler();

/* Start the event loop. */
while( 0 == mIsShutdown ) {
SP_IocpEventCallback::eventLoop( &eventArg, &acceptArg );

for( ; NULL != eventArg.getInputResultQueue()->top(); ) {
SP_Task * task = (SP_Task*)eventArg.getInputResultQueue()->pop();
workerExecutor.execute( task );
}

for( ; NULL != eventArg.getOutputResultQueue()->top(); ) {
SP_Message * msg = (SP_Message*)eventArg.getOutputResultQueue()->pop();

void ** arg = ( void** )malloc( sizeof( void * ) * 2 );
arg[ 0 ] = (void*)completionHandler;
arg[ 1 ] = (void*)msg;

actExecutor.execute( outputCompleted, arg );
}
}
delete completionHandler;
sp_syslog( LOG_NOTICE, "Server is shutdown." );
sp_close( listenFD );
}
return ret;

另外还有两个线程池用于执行每个请求:
SP_Executor actExecutor( 1, "act" );
SP_Executor workerExecutor( mMaxThreads, "work" );

基本上框架就很明显了.

架构分析

在 [url=http://code.google.com/p/spserver/]SPServer 中实现了 HSHA 和 LF 两种线程池。

目前的实现还是比较可读的,这两种线程池最主要的处理逻辑各自都被集中到了一个函数中。

先来看看 HSHA 的核心实现代码

http://spserver.googlecode.com/svn/trunk/spserver.cpp

int SP_Server :: start()
{
......
SP_Executor workerExecutor( mMaxThreads, "work" );

SP_Executor actExecutor( 1, "act" );

/* Start the event loop. */
while( 0 == mIsShutdown ) {
event_base_loop( eventArg.getEventBase(), EVLOOP_ONCE );
for( ; NULL != eventArg.getInputResultQueue()->top(); ) {
SP_Task * task = (SP_Task*)eventArg.getInputResultQueue()->pop();
workerExecutor.execute( task );
}

for( ; NULL != eventArg.getOutputResultQueue()->top(); ) {
SP_Message * msg = (SP_Message*)eventArg.getOutputResultQueue()->pop();

......
actExecutor.execute( outputCompleted, arg );
}
}


......

}



一些细节都被去掉了。从这段代码可以看到,HSHA 的处理流程是:
1.运行 start 函数的线程称为 event_loop 线程,负责 recv/send 。
  所有的 recv/send 都在 event_base_loop 这个函数调用上完成的。
  这个层就是属于异步层。
2. event_base_loop 在 recv 的时候,会调用 MsgDecoder.decode 函数,如果 decode 返回 OK ,说明完整地读入数据了,那么就把对应的数据放入   eventArg.mInputResultQueue 里面。
  在 send 的时候,如果把一个 Message 完整地发送了,   那么就把这个 Message 放入 eventArg.mOutputResultQueue。   这两个就是队列,队列里面保存的数据一般称为完成事件。
3.workerExecutor 和 actExecutor 就是同步层。
  由于完成事件的处理可能会涉及很复杂的处理,可能会使用到数据库或者其他, 因此不能直接使用 event_loop 线程,而是使用线程池。
  这个就是同步层。

再来看 LF 的核心代码

http://spserver.googlecode.com/svn/trunk/splfserver.cpp

int SP_LFServer :: run()
{
......
mThreadPool = new SP_ThreadPool( mMaxThreads );
for( int i = 0; i < mMaxThreads; i++ ) {
mThreadPool->dispatch( lfHandler, this );
}

......

}

void SP_LFServer :: lfHandler( void * arg )
{
SP_LFServer * server = (SP_LFServer*)arg;
for( ; 0 == server->mIsShutdown; ) {
/* follower begin */
server->handleOneEvent();
}
}
void SP_LFServer :: handleOneEvent()
{
SP_Task * task = NULL;
SP_Message * msg = NULL;
/* follower wait */
pthread_mutex_lock( &mMutex );
/* follower end */
/* leader begin */
for( ; 0 == mIsShutdown && NULL == task && NULL == msg; ) {
if( mEventArg->getInputResultQueue()->getLength() > 0 ) {
task = (SP_Task*)mEventArg->getInputResultQueue()->pop();
} else if( mEventArg->getOutputResultQueue()->getLength() > 0 ) {
msg = (SP_Message*)mEventArg->getOutputResultQueue()->pop();
}

if( NULL == task && NULL == msg ) {
event_base_loop( mEventArg->getEventBase(), EVLOOP_ONCE );
}
}
/* leader end */
pthread_mutex_unlock( &mMutex );


/* worker begin */
if( NULL != task ) task->run();
if( NULL != msg ) mCompletionHandler->completionMessage( msg );
/* worker end */
}



在 run 函数中,启动线程池中的多个线程运行 lfHandler ,由 lfHandler 不断地运行 handleOneEvent 。
线程的角色转变在上面的注释中可以很清楚地看到。

由于这里的实现比较简单,LF 线程池的线程都是预先创建的, 并且没有实现线程池之外的线程可以加入的功能, 所以在 leader 切换为 worker,并且提升一个 follower 为 leader 的时候, 只需要用一个 mutex 就可以解决了。

友情链接
版权所有 Copyright(c)2004-2015 锐英源软件
公司注册号:410105000449586 豫ICP备08007559号 最佳分辨率 1024*768
地址:郑州市文化路47号院1号楼4层(47-1楼位于文化路和红专路十字路口东北角,郑州大学工学院招待所南边,工学院科技报告厅西边。)