DSS 代码分析【Reliable UDP之数据重传】


Darwin 流媒体服务器支持使用Reliable UDP的方式发送RTP,Reliable UDP使用数据重传和拥挤控制算法,与TCP协议采取的算法类似。

在使用Reliable UDP发送RTP数据时调用RTPStream::ReliableRTPWrite,具体实现如下:

QTSS_Error RTPStream::ReliableRTPWrite(void* inBuffer, UInt32 inLen, const SInt64& curPacketDelay)
	// Send retransmits for all streams on this session
	RTPStream** retransStream = NULL;
	UInt32 retransStreamLen = 0;
	// Send retransmits if we need to
	for (int streamIter = 0; fSession->GetValuePtr(qtssCliSesStreamObjects, streamIter, (void**)&retransStream, &retransStreamLen) == QTSS_NoErr; streamIter++)
		if (retransStream != NULL && *retransStream != NULL)
		// Assign a lifetime to the packet using the current delay of the packet and
		// the time until this packet becomes stale.
		fBytesSentThisInterval += inLen;
		fResender.AddPacket(inBuffer, inLen, (SInt32)(fDropAllPacketsForThisStreamDelay - curPacketDelay));
		(void)fSockets->GetSocketA()->SendTo(fRemoteAddr, fRemoteRTPPort, inBuffer, inLen);
	return err;

void RTPPacketResender::AddPacket(void * inRTPPacket, UInt32 packetSize, SInt32 ageLimit)
	//OSMutexLocker packetQLocker(&fPacketQMutex);
	// the caller needs to adjust the overall age limit by reducing it
	// by the current packet lateness.
	// we compute a re-transmit timeout based on the Karns RTT esmitate
	UInt16* theSeqNumP = (UInt16*)inRTPPacket;
	UInt16 theSeqNum = ntohs(theSeqNumP[1]);
	if (ageLimit > 0)
		RTPResenderEntry* theEntry = this->GetEmptyEntry(theSeqNum, packetSize);
		// Reset all the information in the RTPResenderEntry
		::memcpy(theEntry->fPacketData, inRTPPacket, packetSize);
		theEntry->fPacketSize = packetSize;
		theEntry->fAddedTime = OS::Milliseconds();//记录添加的时间
		theEntry->fOrigRetransTimeout = fBandwidthTracker->CurRetransmitTimeout();
		theEntry->fExpireTime = theEntry->fAddedTime + ageLimit;//记录过期时间
		theEntry->fNumResends = 0;
		theEntry->fSeqNum = theSeqNum;

		// Track the number of wasted bytes we have
		atomic_add(&sNumWastedBytes, kMaxDataBufferSize - packetSize);

		//PLDoubleLinkedListNode<RTPResenderEntry> * listNode = NEW PLDoubleLinkedListNode<RTPResenderEntry>( new RTPResenderEntry(inRTPPacket, packetSize, ageLimit, fRTTEstimator.CurRetransmitTimeout() ) );
2.当采用Reliable UDP模式时客户端会定期发送APP 类型的RTCP包(ack应答包), 应答包的有效负荷由RTP序列号以及紧跟其后的可变长度的位掩码组成。序列号标识客户端正在应答的首个RTP数据包,此外的每个被应答的RTP数据包都由位掩码中设置的一个位来表示。位掩码是相对于指定序列号的偏移量,掩码中的第一个字节高位表示比指定序列号大1的数据包,第二个位表示比指定数据包大2的数据包,以此类推。位掩码必须以多个four octets的方式发送。

Ack Packet format
0                                      1                                      2                                     3
0  1  2  3  4  5  6  7  8  9  0  1  2  3  4  5  6  7  8  9  0  1  2  3  4  5  6  7  8  9  0  1
   |V=2|P| subtype |   PT=APP=204  |             length                                                 |
   |                           SSRC/CSRC                                                                                 |
   |                          name (ASCII)  = 'qtak'                                                                  |
   |                           SSRC/CSRC                                                                                  |
   |          Reserved             |          Seq num                                                                |
   |                        Mask...                                                                                            |

                               SInt64 RTCPTask::Run()
                                        while (true) //get all the outstanding packets for this socket
						thePacket.Len = 0;
						theSocket->RecvFrom(&theRemoteAddr, &theRemotePort, thePacket.Ptr,
							kMaxRTCPPacketSize, &thePacket.Len);
						if (thePacket.Len == 0)
							break;//no more packets on this socket!

						//if this socket has a demuxer, find the target RTPStream
						if (theDemuxer != NULL)
							RTPStream* theStream = (RTPStream*)theDemuxer->GetTask(theRemoteAddr, theRemotePort);
							if (theStream != NULL)

void RTPStream::ProcessIncomingRTCPPacket(StrPtrLen* inPacket)
	while (currentPtr.Len > 0)
		RTCPPacket rtcpPacket;
		if (!rtcpPacket.ParsePacket((UInt8*)currentPtr.Ptr, currentPtr.Len))
			DEBUG_RTCP_PRINTF(("malformed rtcp packet\n"));
			return;//abort if we discover a malformed RTCP packet
		switch (rtcpPacket.GetPacketType())
		case RTCPPacket::kReceiverPacketType:

		case RTCPPacket::kAPPPacketType:
				DEBUG_RTCP_PRINTF(("RTPStream::ProcessIncomingRTCPPacket kAPPPacketType\n"));
				Bool16 packetOK = false;
				RTCPAPPPacket theAPPPacket;
				if (!theAPPPacket.ParseAPPPacket((UInt8*)currentPtr.Ptr, currentPtr.Len))
					return;//abort if we discover a malformed receiver report
				UInt32 itemName = theAPPPacket.GetAppPacketName();
				itemName = theAPPPacket.GetAppPacketName();
				switch (itemName)

				case RTCPAckPacket::kAckPacketName:
				case RTCPAckPacket::kAckPacketAlternateName:
						packetOK = this->ProcessAckPacket(rtcpPacket, curTime);
Bool16 RTPStream::ProcessAckPacket(RTCPPacket &rtcpPacket, SInt64 &curTime)
	RTCPAckPacket theAckPacket;
	UInt8* packetBuffer = rtcpPacket.GetPacketBuffer();
	UInt32 packetLen = (rtcpPacket.GetPacketLength() * 4) + RTCPPacket::kRTCPHeaderSizeInBytes;

	if (!theAckPacket.ParseAPPData(packetBuffer, packetLen))
		return false;

	if (NULL != fTracker && false == fTracker->ReadyForAckProcessing()) // this stream must be ready to receive acks.  Between RTSP setup and sending of first packet on stream we must protect against a bad ack.
		return false;//abort if we receive an ack when we haven't sent anything.

	this->PrintPacketPrefEnabled((char*)packetBuffer, packetLen, RTPStream::rtcpACK);
	// Only check for ack packets if we are using Reliable UDP
	if (fTransportType == qtssRTPTransportTypeReliableUDP)
		UInt16 theSeqNum = theAckPacket.GetAckSeqNum();
		fResender.AckPacket(theSeqNum, curTime);
		//qtss_printf("Got ack: %d\n",theSeqNum);

		for (UInt16 maskCount = 0; maskCount < theAckPacket.GetAckMaskSizeInBits(); maskCount++)
			if (theAckPacket.IsNthBitEnabled(maskCount))
				fResender.AckPacket(theSeqNum + maskCount + 1, curTime);
				//qtss_printf("Got ack in mask: %d\n",theSeqNum + maskCount + 1);
	return true;
RTPPacketResender::AckPacket调用 RTPPacketResender::RemovePacket将确认的包从fPacketArray中移除。
3.每次调用 RTPStream::ReliableRTPWrite发送数据时,首先都会调用(*retransStream)->fResender.ResendDueEntries()去检查有没有包在超时时间内没有收到确认,加入有就重发, ResendDueEntries()具体代码如下:
void RTPPacketResender::ResendDueEntries()
	if (fPacketsInList <= 0)

	SInt32 numResends = 0;
	RTPResenderEntry* theEntry = NULL;
	SInt64 curTime = OS::Milliseconds();
	for (SInt32 packetIndex = fPacketsInList - 1; packetIndex >= 0; packetIndex--) // walk backwards because remove packet moves array members forward
		theEntry = &fPacketArray[packetIndex];

		if (theEntry->fPacketSize == 0)

		if ((curTime - theEntry->fAddedTime) > fBandwidthTracker->CurRetransmitTimeout())
			// Change:  Only expire packets after they were due to be resent. This gives the client
			// a chance to ack them and improves congestion avoidance and RTT calculation
			if (curTime > theEntry->fExpireTime)
				// This packet is expired
				//qtss_printf("Packet expired: %d\n", ((UInt16*)thePacket)[1]);
				//              qtss_printf("Expired packet %d\n", theEntry->fSeqNum);

			// Resend this packet
			fSocket->SendTo(fDestAddr, fDestPort, theEntry->fPacketData, theEntry->fPacketSize);
			//qtss_printf("Packet resent: %d\n", ((UInt16*)theEntry->fPacketData)[1]);



			//qtss_printf("resend loop numResends=%" _S32BITARG_ " packet theEntry->fNumResends=%" _S32BITARG_ " stream fNumResends=\n",numResends,theEntry->fNumResends++, fNumResends);

			// ok -- lets try this.. add 1.5x of the INITIAL duration since the last send to the rto estimator
			// since we won't get an ack on this packet
			// this should keep us from exponentially increasing due o a one time increase
			// in the actuall rtt, only AddToEstimate on the first resend ( assume that it's a dupe )
			// if it's not a dupe, but rather an actual loss, the subseqnuent actuals wil bring down the average quickly

			if (theEntry->fNumResends == 1)
				fBandwidthTracker->AddToRTTEstimate((SInt32)((theEntry->fOrigRetransTimeout * 3) / 2));

			//          qtss_printf("Retransmitted packet %d\n", theEntry->fSeqNum);
			theEntry->fAddedTime = curTime;



超时判断代码:if((curTime - theEntry->fAddedTime) > fBandwidthTracker->CurRetransmitTimeout())

包过期判断代码:if (curTime > theEntry->fExpireTime)

