精通
英语
和
开源
,
擅长
开发
与
培训
,
胸怀四海
第一信赖
随着UDT::Startup()的启动,GC线程也随之运行了。
GC主要关注的就是UDTSocket的释放,下面我们来看看这个GC线程是怎么实现的。
1 #ifndef WIN32
2 void* CUDTUnited::garbageCollect(void* p)
3 #else
4 DWORD WINAPI CUDTUnited::garbageCollect(LPVOID p)
5 #endif
6 {
7 // 获得当前GC对应的Socket单位
8 CUDTUnited* self = (CUDTUnited*)p;
9
10 // 应用锁保护,这里的锁是CUDTUnited::m_GCStopLock
11 CGuard gcguard(self->m_GCStopLock);
12
13 // 如果Socket单位没有标记为关闭中,那么GC持续生效
14 while (!self->m_bClosing)
15 {
16 self->checkBrokenSockets();
17
18 #ifdef WIN32
19 self->checkTLSValue();
20 #endif
21
22 // 这里处理超时解锁,windows下的WaitForSingleObject函数直接输入超时时间间隔
23 // 而非windows的需要提供绝对时间,所以需要先根据当前时间得到超时时间,然后作为参数传入pthread_cond_timedwait
24 // 实际上就是当计时器用,确保两次检查的时间间隔最多为1秒钟
25 #ifndef WIN32
26 timeval now;
27 timespec timeout;
28 gettimeofday(&now, 0);
29 timeout.tv_sec = now.tv_sec + 1;
30 timeout.tv_nsec = now.tv_usec * 1000;
31
32 pthread_cond_timedwait(&self->m_GCStopCond, &self->m_GCStopLock, &timeout);
33 #else
34 WaitForSingleObject(self->m_GCStopCond, 1000);
35 #endif
36 }
37
38 // 到这里说明Socket单位处于正在关闭的状态
39
40 // remove all sockets and multiplexers
41 // 移除所有的Sockets和多路复用器
42
43 // 启动锁保护,这里不直接用CGuard是因为要多次用到这个锁,并且在下面的checkBrokenSockets中也要使用该锁,所以手动处理
44 CGuard::enterCS(self->m_ControlLock);
45 for (map<UDTSOCKET, CUDTSocket*>::iterator i = self->m_Sockets.begin(); i != self->m_Sockets.end(); ++ i)
46 {
47 // 标记作废,调用关闭,并标记为CLOSED,更新时间戳,将Socket对象加入已关闭列表
48 i->second->m_pUDT->m_bBroken = true;
49 i->second->m_pUDT->close();
50 i->second->m_Status = CLOSED;
51 i->second->m_TimeStamp = CTimer::getTime();
52 self->m_ClosedSockets[i->first] = i->second;
53
54 // remove from listener's queue
55 // 从监听列表中移除
56 map<UDTSOCKET, CUDTSocket*>::iterator ls = self->m_Sockets.find(i->second->m_ListenSocket);
57 if (ls == self->m_Sockets.end())
58 {
59 // 如果在Socket列表和已关闭列表中都未找到监听Socket,则不作处理
60 ls = self->m_ClosedSockets.find(i->second->m_ListenSocket);
61 if (ls == self->m_ClosedSockets.end())
62 continue;
63 }
64
65 // 运行到这里说明找到了监听Socket
66 // 从监听Socket的待Accept及已Accept列表中移除当前Socket
67 CGuard::enterCS(ls->second->m_AcceptLock);
68 ls->second->m_pQueuedSockets->erase(i->second->m_SocketID);
69 ls->second->m_pAcceptSockets->erase(i->second->m_SocketID);
70 CGuard::leaveCS(ls->second->m_AcceptLock);
71 }
72 // 已移除所有Socket项目,清空列表
73 self->m_Sockets.clear();
74
75 // 将已关闭列表中的所有项时间戳标记为0
76 for (map<UDTSOCKET, CUDTSocket*>::iterator j = self->m_ClosedSockets.begin(); j != self->m_ClosedSockets.end(); ++ j)
77 {
78 j->second->m_TimeStamp = 0;
79 }
80
81 // 手动解锁
82 CGuard::leaveCS(self->m_ControlLock);
83
84 while (true)
85 {
86 // 检查作废Socket,直到已关闭Socket列表为空
87 self->checkBrokenSockets();
88
89 CGuard::enterCS(self->m_ControlLock);
90 bool empty = self->m_ClosedSockets.empty();
91 CGuard::leaveCS(self->m_ControlLock);
92
93 if (empty)
94 break;
95
96 CTimer::sleep();
97 }
98
99 #ifndef WIN32
100 return NULL;
101 #else
102 return 0;
103 #endif
104 }
1 void CUDTUnited::checkBrokenSockets()
2 {
3 CGuard cg(m_ControlLock);
4
5 // set of sockets To Be Closed and To Be Removed
6 // tbc要关闭的Socket列表
7 // tbr要移除的Socket列表
8 vector<UDTSOCKET> tbc;
9 vector<UDTSOCKET> tbr;
10
11 // 循环单元中所有Socket
12 for (map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++ i)
13 {
14 // check broken connection
15 // 检查状态是否为已作废
16 if (i->second->m_pUDT->m_bBroken)
17 {
18 if (i->second->m_Status == LISTENING)
19 {
20 // for a listening socket, it should wait an extra 3 seconds in case a client is connecting
21 // 如果该Socket是一个监听Socket,那么需要在作废后3秒钟再处理该Socket,因为可能有客户端正在连接
22 if (CTimer::getTime() - i->second->m_TimeStamp < 3000000)
23 continue;
24 }
25 else if ((i->second->m_pUDT->m_pRcvBuffer->getRcvDataSize() > 0) && (i->second->m_pUDT->m_iBrokenCounter -- > 0))
26 {
27 // if there is still data in the receiver buffer, wait longer
28 // 如果接收缓存中还有数据,并且检查计数大于0,则暂不处理
29 // 检查计数m_iBrokenCounter,在通常情况下为30,调用shutdown时为60,可以理解为30秒、60秒
30 continue;
31 }
32
33 //close broken connections and start removal timer
34 // 关闭作废的连接,并移除计时器
35 // 标记状态为CLOSED,更新时间戳为当前,将Socket加入要关闭的Socket列表
36 i->second->m_Status = CLOSED;
37 i->second->m_TimeStamp = CTimer::getTime();
38 tbc.push_back(i->first);
39 m_ClosedSockets[i->first] = i->second;
40
41 // remove from listener's queue
42 // 从监听列表中移除
43 map<UDTSOCKET, CUDTSocket*>::iterator ls = m_Sockets.find(i->second->m_ListenSocket);
44 if (ls == m_Sockets.end())
45 {
46 // 如果在Socket列表和已关闭列表中都未找到监听Socket,则不作处理
47 ls = m_ClosedSockets.find(i->second->m_ListenSocket);
48 if (ls == m_ClosedSockets.end())
49 continue;
50 }
51
52 // 运行到这里说明找到了监听Socket
53 // 从监听Socket的待Accept及已Accept列表中移除当前Socket
54 CGuard::enterCS(ls->second->m_AcceptLock);
55 ls->second->m_pQueuedSockets->erase(i->second->m_SocketID);
56 ls->second->m_pAcceptSockets->erase(i->second->m_SocketID);
57 CGuard::leaveCS(ls->second->m_AcceptLock);
58 }
59 }
60
61 // 遍历已关闭Socket列表中的项
62 for (map<UDTSOCKET, CUDTSocket*>::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++ j)
63 {
64 // 有犹豫时间
65 if (j->second->m_pUDT->m_ullLingerExpiration > 0)
66 {
67 // asynchronous close:
68 // 异步关闭
69 // 关闭条件为:发送列表已经为空,或者发送列表的当前缓存大小为0,或者超出犹豫时间
70 if ((NULL == j->second->m_pUDT->m_pSndBuffer) || (0 == j->second->m_pUDT->m_pSndBuffer->getCurrBufSize()) || (j->second->m_pUDT->m_ullLingerExpiration <= CTimer::getTime()))
71 {
72 j->second->m_pUDT->m_ullLingerExpiration = 0;
73 j->second->m_pUDT->m_bClosing = true;
74 j->second->m_TimeStamp = CTimer::getTime();
75 }
76 }
77
78 // timeout 1 second to destroy a socket AND it has been removed from RcvUList
79 // Socket被标记时间戳后1秒钟,并且已经从接收列表节点中移除,那么把它放入可移除列表
80 if ((CTimer::getTime() - j->second->m_TimeStamp > 1000000) && ((NULL == j->second->m_pUDT->m_pRNode) || !j->second->m_pUDT->m_pRNode->m_bOnList))
81 {
82 tbr.push_back(j->first);
83 }
84 }
85
86 // move closed sockets to the ClosedSockets structure
87 // 将tbc可关闭Socket列表中的对象从当前Socket列表中移除,这里作者的注释有问题
88 // 实际上在上面m_ClosedSockets[i->first] = i->second;已经把这些Socket添加到ClosedSockets中了
89 for (vector<UDTSOCKET>::iterator k = tbc.begin(); k != tbc.end(); ++ k)
90 m_Sockets.erase(*k);
91
92 // remove those timeout sockets
93 // 对tbr可移除Socket列表中的项执行removeSocket
94 for (vector<UDTSOCKET>::iterator l = tbr.begin(); l != tbr.end(); ++ l)
95 removeSocket(*l);
96 }
1 void CUDTUnited::removeSocket(const UDTSOCKET u)
2 {
3 // 可移除的Socket必须在m_ClosedSockets列表中存在
4 map<UDTSOCKET, CUDTSocket*>::iterator i = m_ClosedSockets.find(u);
5
6 // invalid socket ID
7 // 如果不存在则不予移除
8 if (i == m_ClosedSockets.end())
9 return;
10
11 // decrease multiplexer reference count, and remove it if necessary
12 // 获取复用器ID
13 const int mid = i->second->m_iMuxID;
14
15 // 如果待Accept队列不为空
16 if (NULL != i->second->m_pQueuedSockets)
17 {
18 CGuard::enterCS(i->second->m_AcceptLock);
19
20 // if it is a listener, close all un-accepted sockets in its queue and remove them later
21 // 如果当前Socket是一个监听Socket,就要关闭所有尚未Accept的连接并在稍后将它们移除
22 for (set<UDTSOCKET>::iterator q = i->second->m_pQueuedSockets->begin(); q != i->second->m_pQueuedSockets->end(); ++ q)
23 {
24 m_Sockets[*q]->m_pUDT->m_bBroken = true;
25 m_Sockets[*q]->m_pUDT->close();
26 m_Sockets[*q]->m_TimeStamp = CTimer::getTime();
27 m_Sockets[*q]->m_Status = CLOSED;
28 m_ClosedSockets[*q] = m_Sockets[*q];
29 m_Sockets.erase(*q);
30 }
31
32 CGuard::leaveCS(i->second->m_AcceptLock);
33 }
34
35 // remove from peer rec
36 // 从m_PeerRec中移除该Socket信息
37 // m_PeerRec用来记录连接节点信息以防止重复的连接请求,必须在关闭时移除以保证下次连接不会被拒绝
38 map<int64_t, set<UDTSOCKET> >::iterator j = m_PeerRec.find((i->second->m_PeerID << 30) + i->second->m_iISN);
39 if (j != m_PeerRec.end())
40 {
41 j->second.erase(u);
42 if (j->second.empty())
43 m_PeerRec.erase(j);
44 }
45
46 // delete this one
47 // 关闭、删除当前Socket对象,并从m_ClosedSockets列表中移除
48 i->second->m_pUDT->close();
49 delete i->second;
50 m_ClosedSockets.erase(i);
51
52 // 从复用器列表中查找当前的复用器
53 map<int, CMultiplexer>::iterator m;
54 m = m_mMultiplexer.find(mid);
55 if (m == m_mMultiplexer.end())
56 {
57 //something is wrong!!!
58 // 如果没找到肯定有什么地方搞错了,但是这里没做处理
59 return;
60 }
61
62 // 把当前复用器所对应的UDT计数减一
63 // 如果减至0,说明已经没有可复用的UDT对象,则从复用器列表中将当前复用器移除
64 m->second.m_iRefCount --;
65 if (0 == m->second.m_iRefCount)
66 {
67 m->second.m_pChannel->close();
68 delete m->second.m_pSndQueue;
69 delete m->second.m_pRcvQueue;
70 delete m->second.m_pTimer;
71 delete m->second.m_pChannel;
72 m_mMultiplexer.erase(m);
73 }
74 }