精通
英语
和
开源
,
擅长
开发
与
培训
,
胸怀四海
第一信赖
这个模块包含反射用的各个角色,以支持反射的建立、设置、播放和中断等操作。这个模块不是类,只是一个概念,它的支撑体是一个文件。
DoSetup
响应RTSP的Setup请求,添加一个RTPSessionOutput
HaveStreamBuffers
用来判断ReflectorStream是否已经向ReflectorSender里发送RTP/RTCP桢。
ReflectorOutput
这是一个类,其对象代表一个反射客户端,不过这个是个虚基类,在系统中没有实体对象,其派生类RTPSessionOutput实现了大部分功能。
RTPSessionOutput
这是一个类,其对象代表一个反射客户端。它的数据来源于ReflectorSender,然后把数据发给两个RTPStream。下面是重要成员描述。
QTSS_Error RTPSessionOutput::WritePacket(StrPtrLen* inPacket, void* inStreamCookie,
UInt32 inFlags, SInt64 packetLatenessInMSec, SInt64* timeToSendThisPacketAgain, UInt64* packetIDPtr,
SInt64* arrivalTimeMSecPtr)
// This writes the packet out to the proper QTSS_RTPStreamObject.这个write函数把桢写入到合适的QTSS_RTPStreamObject
// If this function returns QTSS_WouldBlock, timeToSendThisPacketAgain will如果这个函数返回QTSS_WouldBlock,timeToSendThisPacketAgain会
// be set to # of msec in which the packet can be sent, or -1 if unknown设置为随后应该发送此桢的毫秒时间,或者不知道为-1
获取事务状态,如果不是播放状态,则返回得到事务内的每个流,对于每个流:
如果包是这个流匹配的:
//如果序列号不对,则返回没错误,不发送
//通过判断流属性里的桢ID大小来进行是否已经发送
//如果现在没有准备好发桢,阻塞返回.面向RTCP,通过判断间隔时间来实现
生成流发送用的桢,并生成桢的发送时间。这时如果最老的桢和最新的桢是随后整个缓冲时间(如同now==0情况)时,添加缓冲时间
调用发送函数,发送。
如果有阻塞错误,则设置重发时间;
如果发送成功,则
设置最后两次发送时间间隔,如果间隔大于1000,则调整为5
设置最后发送时间
如果是RTP包,则设置流的最后发送桢ID属性
如果是RTCP包,则设置流的最后发送桢ID属性和最后发送时间属性
如果有错误,则跳出每个流循环
返回发送函数返回的标志。
ReflectorStream
这是一个类,代表一个SSRC源包括的RTP和RTCP结合的反射管理用的流。它有两个ReflectorSender:fRTPSender,fRTCPSender。这两个sender负责管理向RTPSessionOutput发送ReflectorSocket接收到的反射源数据。
这个类对应的对象是ReflectorSession.h中定义
ReflectorStream** fStreamArray;
而fStreamArray是在ReflectorSession::SetupReflectorSession中初始化。
下面是重要成员:
BindSockets
把ReflectorSocket和ReflectorSender绑定起来,并使ReflectorSocket开始读取反射源数据。
AddOutput
把RTPSessionOutput加入到ReflectorStream的管理数组内,为随后进行发送做准备。
ReflectorSender
这是一个类,其对象代表一个具体的反射RTP流或者RTCP流。它被ReflectorStream管理。它和一个具体的ReflectorSocket对应。ReflectorSocket用来接收,ReflectorSender进行发送。其重要成员有:
ReflectPackets
这个函数进行反射桢工作,把ReflectorSender里缓存的桢发送到各个RTPSessionOutput里。
SendPacketsToOutput
这个函数进行具体的向RTPSessionOutput发送工作。
ReflectorSocket
这是一个类,其对象对应一个具体反射源,比如RTP或者RTCP的接收Socket。它和ReflectorSender一一对应。ReflectorSocket接收,ReflectorSender发送。另外这也是反射工作激发的载体,这个类是事务线程,带有消息处理,可以根据消息运行,并驱动Sender进行发送。
这个类对应的对象是在ReflectorStream::BindSockets函数内初始化,对象是管理ReflectorStream管理的。
其重要成员为:
GetIncomingData
具体的桢接收函数。
ProcessPacket
桢处理函数,把接收到的桢传送给ReflectorSender,使桢数据保存到ReflectorSender的内部队列里。
RTPStream
这是一个类,其对象对应一个具体的向播放客户端发送数据的流。在反射机制里,它被RTPSessionOutput管理和驱动,在RTPSessionOutput调用了QTSS_Write以后,执行流程进入到这个类的代码空间里。
OSQueueElem* ReflectorSender::SendPacketsToOutput(ReflectorOutput* theOutput, OSQueueElem* currentPacket,
SInt64 currentTime, SInt64 bucketDelay)
后两个参数是当前时间,成组的输出形成的包延迟时间。
这个延迟时间是这样管理的:代表直播客户端的输出被一组一组地管理成二维形式,而不是线性的一维形式。根据这个二维形式,一个组对应一个延迟时间,越在后面的组,延迟的时间越长。
QTSS_Error RTPSessionOutput::WritePacket(StrPtrLen* inPacket, void* inStreamCookie,
UInt32 inFlags, SInt64 packetLatenessInMSec, SInt64* timeToSendThisPacketAgain, UInt64* packetIDPtr,
SInt64* arrivalTimeMSecPtr)
packetLatenessInMSec是上个函数的bucketDelay
timeToSendThisPacketAgain,再次发送时间,在此函数里有修改。
arrivalTimeMSecPtr到达时间指针,在WritePacket函数里没有修改。
packetTransmitTime
这个成员是QTSS_PacketStruct结构的成员,这个结构是RTPStream类里Write函数所需要的。而packetTransmitTime是桢的实际发送时间,其生成规则为:(到达时间+总缓冲时间)-延迟时间。
fLastIntervalMilliSec
这个成员是ReflectorOutput的成员变量。表达意义为最后一次发送时当前时间减去发送时间的差值,当发送线程效率高时,这个时间是ReflectorSocket的下次运行时间,以使发送更快进行。这是通过ReflectorSender::SendPacketsToOutput函数里的:
if (err == QTSS_WouldBlock)
{ // call us again in # ms to retry on an EAGAIN在#ms后再次调用此函数来尝试一个EAGAIN
if ((timeToSendPacket > 0) && ( (fNextTimeToRun + currentTime) > timeToSendPacket ))
// blocked but we are scheduled to wake up later阻塞但是我们安排在随后唤醒
fNextTimeToRun = timeToSendPacket - currentTime;//以桢发送时间减去当前时间作为下次运行时间,下次运行时间是个间隔,随后会补充上当前时间
if (theOutput->fLastIntervalMilliSec < 5 )
theOutput->fLastIntervalMilliSec = 5;
if ( timeToSendPacket < 0 ) // blocked and we are behind阻塞且我们落后
fNextTimeToRun = theOutput->fLastIntervalMilliSec; // Use the last packet interval 使用最后的桢间隔
if (fNextTimeToRun > 1000) //don't wait that long不等待太长时间
fNextTimeToRun = 1000;
if (fNextTimeToRun < 5) //wait longer长时间等待
fNextTimeToRun = 5;
if (theOutput->fLastIntervalMilliSec >= 1000)
// allow up to 1 second max -- allow some time for the socket to clear and don't go into a tight loop if the client is gone.
//允许高达1秒的最大值--允许给socket一些时间来清除且如果客户端不在时,不会进入紧张的循环
theOutput->fLastIntervalMilliSec = 1000;
else
theOutput->fLastIntervalMilliSec *= 2; // scale upwards over time向overtime量度
//qtss_printf ( "Blocked ReflectorSender::SendPacketsToOutput timeToSendPacket=%qd fLastIntervalMilliSec=%qd fNextTimeToRun=%qd \n", timeToSendPacket, theOutput->fLastIntervalMilliSec, fNextTimeToRun);
break;
}
在形成新值时,如果值大于1000,则修改为5。如果小于5也算5
fNextTimeToRun
它是ReflectorSender类里的成员。用来安排调用ReflectPackets的时机。
在ReflectPackets函数里初始化为10秒,如果是RTCP,则为1秒。随后在SendPacketsToOutput里有协调。在 ReflectPackets函数结束时,对这个值有调整,调整的逻辑如下:
//不要忘记调用者想要知道下次运行时间
if (*ioWakeupTime == 0)//如果休息时间为0
*ioWakeupTime = fNextTimeToRun;//则休息时间为下次运行时间的间隔
else if ((fNextTimeToRun > 0) && (*ioWakeupTime > fNextTimeToRun))//如果休息时间大于下次时间
*ioWakeupTime = fNextTimeToRun;//休息时间等于下次运行时间
// exit with fNextTimeToRun in real time, not relative time.
//fNextTimeToRun加上实际时间,不要用相对时间
fNextTimeToRun += currentTime;
在ReflectorSender类的ShouldReflectNow函数里使用,用来决定是否使ReflectorSender调用ReflectPackets。在ShouldReflectNow里用当前时间和下次运行时间(fNextTimeToRun)来生成fSleepTime,在生成时对不合理值进行了过滤。
在ReflectSocket的Run函数里,以ShouldReflectNow来管理fSleepTime。
err = theOutput->WritePacket(&thePacket->fPacketPtr, fStream, fWriteFlag, packetLateness, &timeToSendPacket,&thePacket->fStreamCountID,&thePacket->fTimeArrived );
这个里面的timeToSendPacket对fNextTimeToRun有影响,影响是通过fLastIntervalMilliSec施加的。
timeToSendPacket
RTPSessionOutput类的WritePacket函数里用来保留下次发送时间。和suggestedWakeupTime一样。
suggestedWakeupTime
这个变量是QTSS_PacketStruct结构的成员。
这个变量是RTPStream类的Write函数在处理发送时对QTSS_PacketStruct结构进行处理时生成。通过fSession->GetOverbufferWindow()->CheckTransmitTime计算而来。
通过RTSP协议的Play请求里的Range内的信息,区分正常反射流和时移流,对于时移流,以时移数据源为数据来源;对于正常反射流,用达尔文的默认缓冲数据源为数据来源。
时移数据源管理要做到添加、删除、定位、加载、保存、过渡和时移数据与达尔文默认缓冲机制结合功能。
添加指的是把达尔文默认缓冲里删除的数据包加入到时移数据源内。
删除指的是把超过时移时限的时移数据从管理范围内删除。
定位指的是把适合range信息时间的管理单元确定。
加载指的是,由于内存不可能缓冲过大数据,时移数据不可能全部保存到内存中,只能实现小部分在内存,大部分在硬盘,在这种情况下,如果需要的时移数据不在内存中,而是在硬盘上,对数据有一个从硬盘加载到内存管理单元内的一种情况。
保存指的是把累积够一个时移单元的时移数据写入到硬盘的过程。
过渡指的是在进行时移时,前一个时移单元已经发送完了,需要发送下一个时移单元时的处理过程。
时移数据与达尔文默认缓冲机制结合指的是把时移数据放到达尔文默认缓冲机制里,还会使这个机制正常工作,使时移的数据包能够发给客户端。
时移源组队列
这个队列按照一个时移单元一组形式管理时移数据,在有需要时,这个队列里才会有实际数据,平常不占用内存。这个队列管理的单元叫时移源组,这个时移源组是一个类,定义如下:
{
引用的ReflectorOutput源个数fRefOutputCount
引用的ReflectorOutput源ID队列,fRefOutputAry
开始时间,绝对秒数,fAbsoStartSeco
结束时间,绝对秒数,fAbsoEndSeco
RTP/RTCP桢队列,fTSPacketQueue
是否已经从硬盘上加载过了fIsLoadFromDisk
是否是准备单元fIsPreparUnit,最后一个单元长度还不够2分钟,这个标志为真
桢队列内单元个数fCountOfPacketQueue
}
这个队列定义在ReflectorSender内。
RTPSessionOutput的输出的数据来源由四个ReflectorSender来提供,同时这四个ReflectorSender的数据又是要提供给多个RTPSessionOutput,为了使每个RTPSessionOutput都能够知道在ReflectorSender里的时移数据情况,建立了这个队列。这个队列里的单元类定义如下:
{
ReflectorSender指针,fpSender
是否第一次处理过,fHaveFirstDeal
当前时移源组单元指针,fpTimeShiftUnit
队列内已经发送桢个数fHaveSendCount
}
这个队列定义在RefletorOutput里(ReflectorOutput是RTPSessionOutput的基类)。
时移源组单元长度
2分钟,120秒。
#define TISH_PACKETQUEUEUNIT_TIMELEN 120//by shw 时移桢队列单元时间长度2分钟
内存占用上,大概大小为120*1.5=180M。
时移源组总长度
30分钟,1800秒。
是否时移标志和时移开始时间
这两个变量添加到ReflectorOutput类里。
Float64 fRangeStartTime;//时移开始时间by shw for timeshift 200611,默认为0
Bool16 fIsTimeShfit;//是否时移,通过对range内的时间进行判断来生成,默认为假,by shw
添加
在ReflectorSender::RemoveOldPackets函数里进行了把数据包从默认缓冲区里删除的操作。
如果ReflectorPacket的fNeededByOutput为假,会让RemoveOldPackets删除ReflectorPacket对象。在删除时把达尔文缓冲拷贝到时移缓冲。
如果没有时移源组队列单元,则新建一个单元,进行添加。新建单元的fIsPreparUnit设置为真。
如果有时移源组队列单元,且最后一个单元的时间跨度没有2分钟,可以使用最后单元。
如果有且最后单元超过2分钟,则新建一个,把上面单元进行保存到硬盘处理,上面单元的fIsPreparUnit设置为假。
删除
在ReflectorSender类的ReflectPackets成员函数里,当执行过RemoveOldPackets后,如果ReflectorOutput是时移的,则先根据ReflectorOutput里的时移发送引用队列里和此ReflectorSender对应的单元的fHaveSendCount和fpTimeShiftUnit里指向的队列个数比较(这个过程和过渡处理结合),如果相等,则表示这个时移源组队列单元里的桢发完了,把fpTimeShiftUnit指向的
引用的ReflectorOutput源个数fRefOutputCount减1
引用的ReflectorOutput源ID队列,fRefOutputAry里删除使用的ReflectorOutput。
上面处理完成后,把fRefOutputCount为0的单元里的桢队列释放掉,使其不占用资源。
上面处理完成后,把开始时间和结束时间都超过缓冲长度时间的时移桢单元从时移源组队列删除,如果单元里桢队列不为空,释放掉。但是如果fRefOutputCount不为0,则不释放。
定位
通过判断
开始时间,绝对秒数,fAbsoStartSeco
结束时间,绝对秒数,fAbsoEndSeco
和时移开始时间的关系来进行。
当根据当前时间和range时间确定一个时移发送时间基点(timeshift_starttime)后,用这个timeshift_startime和时移源组队列里单元里的开始时间和结束时间比较,找到一个单元,如果标志fIsLoadFromDisk为真,则不进行从硬盘加载,如果标志fIsLoadFromDisk为假,则进行从硬盘加载,并且把标志fIsLoadFromDisk设置为真。
加载
时移源组队列单元对应的硬盘文件名生成规则为:
生成的源信息_标准基时间到开始点时间秒数-标准基时间结束点时间秒数。开始点到结束点跨度为2分钟。生成的源信息是一个源IP地址加指定端口。
由于是用ReflectorPacket里的fTimeArrived来生成开始时间和结束时间,所以文件名里表达的时间信息可能不是完全2分钟间隔的,这一点要注意。
加载时先把队列长度加载,接着对此长度个ReflectorPacket对象进行序列化加载。这需要扩充ReflectorPacket类,增加读写函数。
保存
保存时把队列个数先写到文件里,接着是各个ReflectorPacket对象。
过渡
ReflectorSender类的SendPacketsToOutput函数里对某个队列发送个数进行累积,累积的个数可以通过ReflectorOutput里的时移发送引用队列找到此ReflectorSender对应的单元,进行加到fHaveSendCount上。
在ReflectorSender里执行完SendPacketsToOutput循环后,进行处理,如果fHaveSendCount和fpTimeShiftUnit指向的时移源组队列单元里的个数相比较,如果占到3/4则对时移源组队列的下一个单元进行加载,从硬盘里读取内容。如果下一个单元已经加载过了,则不进行加载,如果单元的fIsPreparUnit标志为真,则这个单元还没有保存到硬盘上,则可以直接使用,不用加载。
当一个时移桢队列组里的队列已经发送完时,SendPacketsToOutput在循环执行完后,就会根据队列指针情况,用已有个数和发送个数比较得到不能再向下移动的标志,有了这个标志,就修改ReflectorOutput里时移发送引用队列对应此ReflectorSender哪一项单元里的fpTimeShiftUnit,以指向下一个时移桢队列组单元,把这个单元的队列设置为ReflectorSender的当前发送队列。发送时注意要把桢的时间加上时移开始时间,以使时间上的处理和达尔文的默认缓冲机制想匹配。另外还要处理如下内容:
引用的output源个数fRefOutputCount加1
引用的output源ID队列,fRefOutputAry里加上使用的Output。
上面说的发送时注意要把桢的时间加上时移开始时间是一个因素。
ReflectorSender在ReflectStream类里定义为成员变量,所以可以在AddOutput里,把sender的指针保存到ReflectorOutput里的时移发送引用队列里,并初始化这个队列里的单元,把fpTimeShiftUnit初始化为定位找到的时移源组队列单元指针。
在ReflectorSender的ReflectPackets成员函数里,如果ReflectorOutput是时移的,且时移发送引用队列单元里的是否第一次使用为真,则把fpTimeShiftUnit里的队列做为发送桢队列的指针。如果是否第一次使用为假,则应该从ReflecotrOutptu的GetBookMarkedPacket可以得到上次后续的标记队列内位置,然后从这个位置继续发送。
注意要在SendPacketsToOutput里对时移ReflectorOutput用fpTimeShiftUnit指向的桢队列。