Darwin 流媒体服务器支持使用Reliable UDP的方式发送RTP,Reliable UDP使用数据重传和拥挤控制算法,与TCP协议采取的算法类似。
在使用Reliable UDP发送RTP数据时调用RTPStream::ReliableRTPWrite,具体实现如下:
QTSS_Error RTPStream::ReliableRTPWrite(void* inBuffer, UInt32 inLen, const SInt64& curPacketDelay)
{
// Send retransmits for all streams on this session
RTPStream** retransStream = NULL;
UInt32 retransStreamLen = 0;
//
// Send retransmits if we need to
for (int streamIter = 0; fSession->GetValuePtr(qtssCliSesStreamObjects, streamIter, (void**)&retransStream, &retransStreamLen) == QTSS_NoErr; streamIter++)
{
if (retransStream != NULL && *retransStream != NULL)
(*retransStream)->fResender.ResendDueEntries();
}
......
else
{
//
// Assign a lifetime to the packet using the current delay of the packet and
// the time until this packet becomes stale.
fBytesSentThisInterval += inLen;
fResender.AddPacket(inBuffer, inLen, (SInt32)(fDropAllPacketsForThisStreamDelay - curPacketDelay));
(void)fSockets->GetSocketA()->SendTo(fRemoteAddr, fRemoteRTPPort, inBuffer, inLen);
}
return err;
}
1.在调用fSockets->GetSocketA()->SendTo发送数据前,fResender.AddPacket添加该包到重传数组fPacketArray里,并记录添加的时间以及包的过期时间,代码如下:
void RTPPacketResender::AddPacket(void * inRTPPacket, UInt32 packetSize, SInt32 ageLimit)
{
//OSMutexLocker packetQLocker(&fPacketQMutex);
// the caller needs to adjust the overall age limit by reducing it
// by the current packet lateness.
// we compute a re-transmit timeout based on the Karns RTT esmitate
UInt16* theSeqNumP = (UInt16*)inRTPPacket;
UInt16 theSeqNum = ntohs(theSeqNumP[1]);
if (ageLimit > 0)
{
RTPResenderEntry* theEntry = this->GetEmptyEntry(theSeqNum, packetSize);
......
// Reset all the information in the RTPResenderEntry
::memcpy(theEntry->fPacketData, inRTPPacket, packetSize);
theEntry->fPacketSize = packetSize;
theEntry->fAddedTime = OS::Milliseconds();//记录添加的时间
theEntry->fOrigRetransTimeout = fBandwidthTracker->CurRetransmitTimeout();
theEntry->fExpireTime = theEntry->fAddedTime + ageLimit;//记录过期时间
theEntry->fNumResends = 0;
theEntry->fSeqNum = theSeqNum;
//
// Track the number of wasted bytes we have
atomic_add(&sNumWastedBytes, kMaxDataBufferSize - packetSize);
//PLDoubleLinkedListNode<RTPResenderEntry> * listNode = NEW PLDoubleLinkedListNode<RTPResenderEntry>( new RTPResenderEntry(inRTPPacket, packetSize, ageLimit, fRTTEstimator.CurRetransmitTimeout() ) );
//fAckList.AddNodeToTail(listNode);
fBandwidthTracker->FillWindow(packetSize);
}
else
{
fNumExpired++;
}
fNumSent++;
}
2.当采用Reliable UDP模式时客户端会定期发送APP
类型的RTCP包(ack应答包),
应答包的有效负荷由RTP序列号以及紧跟其后的可变长度的位掩码组成。序列号标识客户端正在应答的首个RTP数据包,此外的每个被应答的RTP数据包都由位掩码中设置的一个位来表示。位掩码是相对于指定序列号的偏移量,掩码中的第一个字节高位表示比指定序列号大1的数据包,第二个位表示比指定数据包大2的数据包,以此类推。位掩码必须以多个four octets的方式发送。
Ack Packet format
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|V=2|P| subtype | PT=APP=204 | length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SSRC/CSRC |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| name (ASCII) = 'qtak' |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SSRC/CSRC |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Reserved | Seq num |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Mask... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
服务器端有个RTCPTask的任务会在TaskThread的线程里一直执行,接收客户端发送过来的RTCP包,并调用ProcessIncomingRTCPPacket将RTCP包交给相应的RTPStream类处理
SInt64 RTCPTask::Run()
{
......
while (true) //get all the outstanding packets for this socket
{
thePacket.Len = 0;
theSocket->RecvFrom(&theRemoteAddr, &theRemotePort, thePacket.Ptr,
kMaxRTCPPacketSize, &thePacket.Len);
if (thePacket.Len == 0)
{
theSocket->RequestEvent(EV_RE);
break;//no more packets on this socket!
}
//if this socket has a demuxer, find the target RTPStream
if (theDemuxer != NULL)
{
RTPStream* theStream = (RTPStream*)theDemuxer->GetTask(theRemoteAddr, theRemotePort);
if (theStream != NULL)
theStream->ProcessIncomingRTCPPacket(&thePacket);
}
}
......
}
RTPStream::ProcessIncomingRTCPPacket会对接收的RTCP进行解析,当解析的是RTCPAckPacket::kAckPacketName类型的包时,会调用ProcessAckPacket,具体代码如下:
void RTPStream::ProcessIncomingRTCPPacket(StrPtrLen* inPacket)
{
......
while (currentPtr.Len > 0)
{
RTCPPacket rtcpPacket;
if (!rtcpPacket.ParsePacket((UInt8*)currentPtr.Ptr, currentPtr.Len))
{
fSession->GetSessionMutex()->Unlock();
DEBUG_RTCP_PRINTF(("malformed rtcp packet\n"));
return;//abort if we discover a malformed RTCP packet
}
......
switch (rtcpPacket.GetPacketType())
{
case RTCPPacket::kReceiverPacketType:
.
case RTCPPacket::kAPPPacketType:
{
DEBUG_RTCP_PRINTF(("RTPStream::ProcessIncomingRTCPPacket kAPPPacketType\n"));
Bool16 packetOK = false;
RTCPAPPPacket theAPPPacket;
if (!theAPPPacket.ParseAPPPacket((UInt8*)currentPtr.Ptr, currentPtr.Len))
{
fSession->GetSessionMutex()->Unlock();
return;//abort if we discover a malformed receiver report
}
UInt32 itemName = theAPPPacket.GetAppPacketName();
itemName = theAPPPacket.GetAppPacketName();
switch (itemName)
{
case RTCPAckPacket::kAckPacketName:
case RTCPAckPacket::kAckPacketAlternateName:
{
packetOK = this->ProcessAckPacket(rtcpPacket, curTime);
}
break;
......
}
}
}
RTPStream::ProcessAckPacket会调用fResender.AckPacket对收到的包进行确认(从发送数组中移除)
Bool16 RTPStream::ProcessAckPacket(RTCPPacket &rtcpPacket, SInt64 &curTime)
{
RTCPAckPacket theAckPacket;
UInt8* packetBuffer = rtcpPacket.GetPacketBuffer();
UInt32 packetLen = (rtcpPacket.GetPacketLength() * 4) + RTCPPacket::kRTCPHeaderSizeInBytes;
if (!theAckPacket.ParseAPPData(packetBuffer, packetLen))
return false;
if (NULL != fTracker && false == fTracker->ReadyForAckProcessing()) // this stream must be ready to receive acks. Between RTSP setup and sending of first packet on stream we must protect against a bad ack.
return false;//abort if we receive an ack when we haven't sent anything.
this->PrintPacketPrefEnabled((char*)packetBuffer, packetLen, RTPStream::rtcpACK);
// Only check for ack packets if we are using Reliable UDP
if (fTransportType == qtssRTPTransportTypeReliableUDP)
{
UInt16 theSeqNum = theAckPacket.GetAckSeqNum();
fResender.AckPacket(theSeqNum, curTime);
//qtss_printf("Got ack: %d\n",theSeqNum);
for (UInt16 maskCount = 0; maskCount < theAckPacket.GetAckMaskSizeInBits(); maskCount++)
{
if (theAckPacket.IsNthBitEnabled(maskCount))
{
fResender.AckPacket(theSeqNum + maskCount + 1, curTime);
//qtss_printf("Got ack in mask: %d\n",theSeqNum + maskCount + 1);
}
}
}
return true;
}
RTPPacketResender::AckPacket调用
RTPPacketResender::RemovePacket将确认的包从fPacketArray中移除。
void RTPPacketResender::ResendDueEntries()
{
if (fPacketsInList <= 0)
return;
SInt32 numResends = 0;
RTPResenderEntry* theEntry = NULL;
SInt64 curTime = OS::Milliseconds();
for (SInt32 packetIndex = fPacketsInList - 1; packetIndex >= 0; packetIndex--) // walk backwards because remove packet moves array members forward
{
theEntry = &fPacketArray[packetIndex];
if (theEntry->fPacketSize == 0)
continue;
if ((curTime - theEntry->fAddedTime) > fBandwidthTracker->CurRetransmitTimeout())
{
// Change: Only expire packets after they were due to be resent. This gives the client
// a chance to ack them and improves congestion avoidance and RTT calculation
if (curTime > theEntry->fExpireTime)
{
//
// This packet is expired
fNumExpired++;
//qtss_printf("Packet expired: %d\n", ((UInt16*)thePacket)[1]);
fBandwidthTracker->EmptyWindow(theEntry->fPacketSize);
this->RemovePacket(packetIndex);
// qtss_printf("Expired packet %d\n", theEntry->fSeqNum);
continue;
}
// Resend this packet
fSocket->SendTo(fDestAddr, fDestPort, theEntry->fPacketData, theEntry->fPacketSize);
//qtss_printf("Packet resent: %d\n", ((UInt16*)theEntry->fPacketData)[1]);
theEntry->fNumResends++;
fNumResends++;
numResends++;
//qtss_printf("resend loop numResends=%" _S32BITARG_ " packet theEntry->fNumResends=%" _S32BITARG_ " stream fNumResends=\n",numResends,theEntry->fNumResends++, fNumResends);
// ok -- lets try this.. add 1.5x of the INITIAL duration since the last send to the rto estimator
// since we won't get an ack on this packet
// this should keep us from exponentially increasing due o a one time increase
// in the actuall rtt, only AddToEstimate on the first resend ( assume that it's a dupe )
// if it's not a dupe, but rather an actual loss, the subseqnuent actuals wil bring down the average quickly
if (theEntry->fNumResends == 1)
fBandwidthTracker->AddToRTTEstimate((SInt32)((theEntry->fOrigRetransTimeout * 3) / 2));
// qtss_printf("Retransmitted packet %d\n", theEntry->fSeqNum);
theEntry->fAddedTime = curTime;
fBandwidthTracker->AdjustWindowForRetransmit();
continue;
}
}
}
从上面代码可以看出,首先判断是否超时,接着还需要判断包是否过期了,如果过期了直接调用RemovePacket删除,如果没有就重发。
超时判断代码:if((curTime - theEntry->fAddedTime) > fBandwidthTracker->CurRetransmitTimeout())
包过期判断代码:if (curTime > theEntry->fExpireTime)
还有一点关于超时时间RTT的计算,会在后面的文章中说明。