锐英源软件
第一信赖

精通

英语

开源

擅长

开发

培训

胸怀四海 

第一信赖

当前位置:锐英源 / 开源技术 / 基于UDP的数据传输协议UDT / UDT源码剖析 UDT的GC线程相关过程代码注释
服务方向
人工智能数据处理
人工智能培训
kaldi数据准备
小语种语音识别
语音识别标注
语音识别系统
语音识别转文字
kaldi开发技术服务
软件开发
运动控制卡上位机
机械加工软件
软件开发培训
Java 安卓移动开发
VC++
C#软件
汇编和破解
驱动开发
联系方式
固话:0371-63888850
手机:138-0381-0136
Q Q:396806883
微信:ryysoft

UDT源码剖析(五):UDT的GC线程相关过程代码注释 .


随着UDT::Startup()的启动,GC线程也随之运行了。

GC主要关注的就是UDTSocket的释放,下面我们来看看这个GC线程是怎么实现的。


1 #ifndef WIN32

2   void* CUDTUnited::garbageCollect(void* p)

3 #else

4   DWORD WINAPI CUDTUnited::garbageCollect(LPVOID p)

5 #endif

6 {

7   // 获得当前GC对应的Socket单位

8   CUDTUnited* self = (CUDTUnited*)p;

9

10   // 应用锁保护,这里的锁是CUDTUnited::m_GCStopLock

11   CGuard gcguard(self->m_GCStopLock);

12

13   // 如果Socket单位没有标记为关闭中,那么GC持续生效

14   while (!self->m_bClosing)

15   {

16      self->checkBrokenSockets();

17

18      #ifdef WIN32

19         self->checkTLSValue();

20      #endif

21

22      // 这里处理超时解锁,windows下的WaitForSingleObject函数直接输入超时时间间隔

23      // 而非windows的需要提供绝对时间,所以需要先根据当前时间得到超时时间,然后作为参数传入pthread_cond_timedwait

24      // 实际上就是当计时器用,确保两次检查的时间间隔最多为1秒钟

25      #ifndef WIN32

26         timeval now;

27         timespec timeout;

28         gettimeofday(&now, 0);

29         timeout.tv_sec = now.tv_sec + 1;

30         timeout.tv_nsec = now.tv_usec * 1000;

31

32         pthread_cond_timedwait(&self->m_GCStopCond, &self->m_GCStopLock, &timeout);

33      #else

34         WaitForSingleObject(self->m_GCStopCond, 1000);

35      #endif

36   }

37

38   // 到这里说明Socket单位处于正在关闭的状态

39

40   // remove all sockets and multiplexers

41   // 移除所有的Sockets和多路复用器

42

43   // 启动锁保护,这里不直接用CGuard是因为要多次用到这个锁,并且在下面的checkBrokenSockets中也要使用该锁,所以手动处理

44   CGuard::enterCS(self->m_ControlLock);

45   for (map<UDTSOCKET, CUDTSocket*>::iterator i = self->m_Sockets.begin(); i != self->m_Sockets.end(); ++ i)

46   {

47      // 标记作废,调用关闭,并标记为CLOSED,更新时间戳,将Socket对象加入已关闭列表

48      i->second->m_pUDT->m_bBroken = true;

49      i->second->m_pUDT->close();

50      i->second->m_Status = CLOSED;

51      i->second->m_TimeStamp = CTimer::getTime();

52      self->m_ClosedSockets[i->first] = i->second;

53

54      // remove from listener's queue

55      // 从监听列表中移除

56      map<UDTSOCKET, CUDTSocket*>::iterator ls = self->m_Sockets.find(i->second->m_ListenSocket);

57      if (ls == self->m_Sockets.end())

58      {

59          // 如果在Socket列表和已关闭列表中都未找到监听Socket,则不作处理

60         ls = self->m_ClosedSockets.find(i->second->m_ListenSocket);

61         if (ls == self->m_ClosedSockets.end())

62            continue;

63      }

64

65      // 运行到这里说明找到了监听Socket

66      // 从监听Socket的待Accept及已Accept列表中移除当前Socket

67      CGuard::enterCS(ls->second->m_AcceptLock);

68       ls->second->m_pQueuedSockets->erase(i->second->m_SocketID);

69      ls->second->m_pAcceptSockets->erase(i->second->m_SocketID);

70      CGuard::leaveCS(ls->second->m_AcceptLock);

71   }

72   // 已移除所有Socket项目,清空列表

73   self->m_Sockets.clear();

74

75   // 将已关闭列表中的所有项时间戳标记为0

76   for (map<UDTSOCKET, CUDTSocket*>::iterator j = self->m_ClosedSockets.begin(); j != self->m_ClosedSockets.end(); ++ j)

77   {

78      j->second->m_TimeStamp = 0;

79   }

80

81   // 手动解锁

82   CGuard::leaveCS(self->m_ControlLock);

83

84   while (true)

85   {

86      // 检查作废Socket,直到已关闭Socket列表为空

87      self->checkBrokenSockets();

88

89      CGuard::enterCS(self->m_ControlLock);

90      bool empty = self->m_ClosedSockets.empty();

91      CGuard::leaveCS(self->m_ControlLock);

92

93      if (empty)

94         break;

95

96      CTimer::sleep();

97   }

98

99   #ifndef WIN32

100      return NULL;

101   #else

102      return 0;

103   #endif

104 }


1 void CUDTUnited::checkBrokenSockets()

2 {

3   CGuard cg(m_ControlLock);

4

5   // set of sockets To Be Closed and To Be Removed

6   // tbc要关闭的Socket列表

7   // tbr要移除的Socket列表

8   vector<UDTSOCKET> tbc;

9   vector<UDTSOCKET> tbr;

10

11   // 循环单元中所有Socket

12   for (map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++ i)

13   {

14      // check broken connection

15      // 检查状态是否为已作废

16      if (i->second->m_pUDT->m_bBroken)

17      {

18         if (i->second->m_Status == LISTENING)

19         {

20            // for a listening socket, it should wait an extra 3 seconds in case a client is connecting

21            // 如果该Socket是一个监听Socket,那么需要在作废后3秒钟再处理该Socket,因为可能有客户端正在连接

22            if (CTimer::getTime() - i->second->m_TimeStamp < 3000000)

23               continue;

24         }

25          else if ((i->second->m_pUDT->m_pRcvBuffer->getRcvDataSize() > 0) && (i->second->m_pUDT->m_iBrokenCounter -- > 0))

26         {

27            // if there is still data in the receiver buffer, wait longer

28            // 如果接收缓存中还有数据,并且检查计数大于0,则暂不处理

29            // 检查计数m_iBrokenCounter,在通常情况下为30,调用shutdown时为60,可以理解为30秒、60秒

30            continue;

31         }

32

33         //close broken connections and start removal timer

34         // 关闭作废的连接,并移除计时器

35         // 标记状态为CLOSED,更新时间戳为当前,将Socket加入要关闭的Socket列表

36         i->second->m_Status = CLOSED;

37         i->second->m_TimeStamp = CTimer::getTime();

38         tbc.push_back(i->first);

39         m_ClosedSockets[i->first] = i->second;

40

41         // remove from listener's queue

42         // 从监听列表中移除

43         map<UDTSOCKET, CUDTSocket*>::iterator ls = m_Sockets.find(i->second->m_ListenSocket);

44         if (ls == m_Sockets.end())

45         {

46             // 如果在Socket列表和已关闭列表中都未找到监听Socket,则不作处理

47            ls = m_ClosedSockets.find(i->second->m_ListenSocket);

48            if (ls == m_ClosedSockets.end())

49               continue;

50         }

51

52         // 运行到这里说明找到了监听Socket

53         // 从监听Socket的待Accept及已Accept列表中移除当前Socket

54         CGuard::enterCS(ls->second->m_AcceptLock);

55         ls->second->m_pQueuedSockets->erase(i->second->m_SocketID);

56         ls->second->m_pAcceptSockets->erase(i->second->m_SocketID);

57         CGuard::leaveCS(ls->second->m_AcceptLock);

58      }

59   }

60

61   // 遍历已关闭Socket列表中的项

62   for (map<UDTSOCKET, CUDTSocket*>::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++ j)

63   {

64      // 有犹豫时间

65      if (j->second->m_pUDT->m_ullLingerExpiration > 0)

66      {

67         // asynchronous close:

68         // 异步关闭

69         // 关闭条件为:发送列表已经为空,或者发送列表的当前缓存大小为0,或者超出犹豫时间

70         if ((NULL == j->second->m_pUDT->m_pSndBuffer) || (0 == j->second->m_pUDT->m_pSndBuffer->getCurrBufSize()) || (j->second->m_pUDT->m_ullLingerExpiration <= CTimer::getTime()))

71         {

72            j->second->m_pUDT->m_ullLingerExpiration = 0;

73            j->second->m_pUDT->m_bClosing = true;

74            j->second->m_TimeStamp = CTimer::getTime();

75         }

76      }

77

78      // timeout 1 second to destroy a socket AND it has been removed from RcvUList

79      // Socket被标记时间戳后1秒钟,并且已经从接收列表节点中移除,那么把它放入可移除列表

80      if ((CTimer::getTime() - j->second->m_TimeStamp > 1000000) && ((NULL == j->second->m_pUDT->m_pRNode) || !j->second->m_pUDT->m_pRNode->m_bOnList))

81      {

82         tbr.push_back(j->first);

83      }

84   }

85

86   // move closed sockets to the ClosedSockets structure

87   // 将tbc可关闭Socket列表中的对象从当前Socket列表中移除,这里作者的注释有问题

88   // 实际上在上面m_ClosedSockets[i->first] = i->second;已经把这些Socket添加到ClosedSockets中了

89   for (vector<UDTSOCKET>::iterator k = tbc.begin(); k != tbc.end(); ++ k)

90      m_Sockets.erase(*k);

91

92   // remove those timeout sockets

93   // 对tbr可移除Socket列表中的项执行removeSocket

94   for (vector<UDTSOCKET>::iterator l = tbr.begin(); l != tbr.end(); ++ l)

95      removeSocket(*l);

96 }


1 void CUDTUnited::removeSocket(const UDTSOCKET u)

2 {

3   // 可移除的Socket必须在m_ClosedSockets列表中存在

4   map<UDTSOCKET, CUDTSocket*>::iterator i = m_ClosedSockets.find(u);

5

6   // invalid socket ID

7   // 如果不存在则不予移除

8   if (i == m_ClosedSockets.end())

9      return;

10

11   // decrease multiplexer reference count, and remove it if necessary

12   // 获取复用器ID

13   const int mid = i->second->m_iMuxID;

14

15   // 如果待Accept队列不为空

16   if (NULL != i->second->m_pQueuedSockets)

17   {

18      CGuard::enterCS(i->second->m_AcceptLock);

19

20      // if it is a listener, close all un-accepted sockets in its queue and remove them later

21      // 如果当前Socket是一个监听Socket,就要关闭所有尚未Accept的连接并在稍后将它们移除

22      for (set<UDTSOCKET>::iterator q = i->second->m_pQueuedSockets->begin(); q != i->second->m_pQueuedSockets->end(); ++ q)

23      {

24         m_Sockets[*q]->m_pUDT->m_bBroken = true;

25         m_Sockets[*q]->m_pUDT->close();

26         m_Sockets[*q]->m_TimeStamp = CTimer::getTime();

27         m_Sockets[*q]->m_Status = CLOSED;

28         m_ClosedSockets[*q] = m_Sockets[*q];

29         m_Sockets.erase(*q);

30      }

31

32      CGuard::leaveCS(i->second->m_AcceptLock);

33   }

34

35   // remove from peer rec

36   // 从m_PeerRec中移除该Socket信息

37   // m_PeerRec用来记录连接节点信息以防止重复的连接请求,必须在关闭时移除以保证下次连接不会被拒绝

38   map<int64_t, set<UDTSOCKET> >::iterator j = m_PeerRec.find((i->second->m_PeerID << 30) + i->second->m_iISN);

39   if (j != m_PeerRec.end())

40   {

41      j->second.erase(u);

42      if (j->second.empty())

43         m_PeerRec.erase(j);

44   }

45

46   // delete this one

47   // 关闭、删除当前Socket对象,并从m_ClosedSockets列表中移除

48   i->second->m_pUDT->close();

49   delete i->second;

50   m_ClosedSockets.erase(i);

51

52   // 从复用器列表中查找当前的复用器

53   map<int, CMultiplexer>::iterator m;

54   m = m_mMultiplexer.find(mid);

55   if (m == m_mMultiplexer.end())

56   {

57      //something is wrong!!!

58      // 如果没找到肯定有什么地方搞错了,但是这里没做处理

59      return;

60   }

61

62   // 把当前复用器所对应的UDT计数减一

63   // 如果减至0,说明已经没有可复用的UDT对象,则从复用器列表中将当前复用器移除

64   m->second.m_iRefCount --;

65   if (0 == m->second.m_iRefCount)

66   {

67      m->second.m_pChannel->close();

68      delete m->second.m_pSndQueue;

69      delete m->second.m_pRcvQueue;

70      delete m->second.m_pTimer;

71      delete m->second.m_pChannel;

72      m_mMultiplexer.erase(m);

73   }

74 }

友情链接
版权所有 Copyright(c)2004-2024 锐英源软件
统一社会信用代码:91410105098562502G 豫ICP备08007559号 最佳分辨率 1440*900
地址:郑州市金水区文化路97号郑州大学北区院内南门附近