当前位置: 首页 > 工具软件 > Reliable > 使用案例 >

DSS 代码分析【Reliable UDP之数据重传】

张俊茂
2023-12-01

Darwin 流媒体服务器支持使用Reliable UDP的方式发送RTP,Reliable UDP使用数据重传和拥挤控制算法,与TCP协议采取的算法类似。

在使用Reliable UDP发送RTP数据时调用RTPStream::ReliableRTPWrite,具体实现如下:

QTSS_Error RTPStream::ReliableRTPWrite(void* inBuffer, UInt32 inLen, const SInt64& curPacketDelay)
{
	// Send retransmits for all streams on this session
	RTPStream** retransStream = NULL;
	UInt32 retransStreamLen = 0;
	//
	// Send retransmits if we need to
	for (int streamIter = 0; fSession->GetValuePtr(qtssCliSesStreamObjects, streamIter, (void**)&retransStream, &retransStreamLen) == QTSS_NoErr; streamIter++)
	{
		if (retransStream != NULL && *retransStream != NULL)
			(*retransStream)->fResender.ResendDueEntries();
	}
......
	else
	{
		//
		// Assign a lifetime to the packet using the current delay of the packet and
		// the time until this packet becomes stale.
		fBytesSentThisInterval += inLen;
		fResender.AddPacket(inBuffer, inLen, (SInt32)(fDropAllPacketsForThisStreamDelay - curPacketDelay));
		(void)fSockets->GetSocketA()->SendTo(fRemoteAddr, fRemoteRTPPort, inBuffer, inLen);
	}
	return err;
}
1.在调用fSockets->GetSocketA()->SendTo发送数据前,fResender.AddPacket添加该包到重传数组fPacketArray里,并记录添加的时间以及包的过期时间,代码如下:

void RTPPacketResender::AddPacket(void * inRTPPacket, UInt32 packetSize, SInt32 ageLimit)
{
	//OSMutexLocker packetQLocker(&fPacketQMutex);
	// the caller needs to adjust the overall age limit by reducing it
	// by the current packet lateness.
	// we compute a re-transmit timeout based on the Karns RTT esmitate
	UInt16* theSeqNumP = (UInt16*)inRTPPacket;
	UInt16 theSeqNum = ntohs(theSeqNumP[1]);
	if (ageLimit > 0)
	{
		RTPResenderEntry* theEntry = this->GetEmptyEntry(theSeqNum, packetSize);
......
		// Reset all the information in the RTPResenderEntry
		::memcpy(theEntry->fPacketData, inRTPPacket, packetSize);
		theEntry->fPacketSize = packetSize;
		theEntry->fAddedTime = OS::Milliseconds();//记录添加的时间
		theEntry->fOrigRetransTimeout = fBandwidthTracker->CurRetransmitTimeout();
		theEntry->fExpireTime = theEntry->fAddedTime + ageLimit;//记录过期时间
		theEntry->fNumResends = 0;
		theEntry->fSeqNum = theSeqNum;

		//
		// Track the number of wasted bytes we have
		atomic_add(&sNumWastedBytes, kMaxDataBufferSize - packetSize);

		//PLDoubleLinkedListNode<RTPResenderEntry> * listNode = NEW PLDoubleLinkedListNode<RTPResenderEntry>( new RTPResenderEntry(inRTPPacket, packetSize, ageLimit, fRTTEstimator.CurRetransmitTimeout() ) );
		//fAckList.AddNodeToTail(listNode);
		fBandwidthTracker->FillWindow(packetSize);
	}
	else
	{
		fNumExpired++;
	}
	fNumSent++;
}
2.当采用Reliable UDP模式时客户端会定期发送APP 类型的RTCP包(ack应答包), 应答包的有效负荷由RTP序列号以及紧跟其后的可变长度的位掩码组成。序列号标识客户端正在应答的首个RTP数据包,此外的每个被应答的RTP数据包都由位掩码中设置的一个位来表示。位掩码是相对于指定序列号的偏移量,掩码中的第一个字节高位表示比指定序列号大1的数据包,第二个位表示比指定数据包大2的数据包,以此类推。位掩码必须以多个four octets的方式发送。

Ack Packet format
0                                      1                                      2                                     3
0  1  2  3  4  5  6  7  8  9  0  1  2  3  4  5  6  7  8  9  0  1  2  3  4  5  6  7  8  9  0  1
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |V=2|P| subtype |   PT=APP=204  |             length                                                 |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                           SSRC/CSRC                                                                                 |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                          name (ASCII)  = 'qtak'                                                                  |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                           SSRC/CSRC                                                                                  |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |          Reserved             |          Seq num                                                                |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                        Mask...                                                                                            |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
服务器端有个RTCPTask的任务会在TaskThread的线程里一直执行,接收客户端发送过来的RTCP包,并调用ProcessIncomingRTCPPacket将RTCP包交给相应的RTPStream类处理

                               SInt64 RTCPTask::Run()
                                   {
                                        ......
                                        while (true) //get all the outstanding packets for this socket
					{
						thePacket.Len = 0;
						theSocket->RecvFrom(&theRemoteAddr, &theRemotePort, thePacket.Ptr,
							kMaxRTCPPacketSize, &thePacket.Len);
						if (thePacket.Len == 0)
						{
							theSocket->RequestEvent(EV_RE);
							break;//no more packets on this socket!
						}

						//if this socket has a demuxer, find the target RTPStream
						if (theDemuxer != NULL)
						{
							RTPStream* theStream = (RTPStream*)theDemuxer->GetTask(theRemoteAddr, theRemotePort);
							if (theStream != NULL)
								theStream->ProcessIncomingRTCPPacket(&thePacket);
						}
					}
                                        ......
                                    }
RTPStream::ProcessIncomingRTCPPacket会对接收的RTCP进行解析,当解析的是RTCPAckPacket::kAckPacketName类型的包时,会调用ProcessAckPacket,具体代码如下:

void RTPStream::ProcessIncomingRTCPPacket(StrPtrLen* inPacket)
{
......
	while (currentPtr.Len > 0)
	{
		RTCPPacket rtcpPacket;
		if (!rtcpPacket.ParsePacket((UInt8*)currentPtr.Ptr, currentPtr.Len))
		{
			fSession->GetSessionMutex()->Unlock();
			DEBUG_RTCP_PRINTF(("malformed rtcp packet\n"));
			return;//abort if we discover a malformed RTCP packet
		}
......
		switch (rtcpPacket.GetPacketType())
		{
		case RTCPPacket::kReceiverPacketType:
			.

		case RTCPPacket::kAPPPacketType:
			{
				DEBUG_RTCP_PRINTF(("RTPStream::ProcessIncomingRTCPPacket kAPPPacketType\n"));
				Bool16 packetOK = false;
				RTCPAPPPacket theAPPPacket;
				if (!theAPPPacket.ParseAPPPacket((UInt8*)currentPtr.Ptr, currentPtr.Len))
				{
					fSession->GetSessionMutex()->Unlock();
					return;//abort if we discover a malformed receiver report
				}
				UInt32 itemName = theAPPPacket.GetAppPacketName();
				itemName = theAPPPacket.GetAppPacketName();
				switch (itemName)
				{

				case RTCPAckPacket::kAckPacketName:
				case RTCPAckPacket::kAckPacketAlternateName:
					{
						packetOK = this->ProcessAckPacket(rtcpPacket, curTime);
					}
					break;
......
		}
	}
}
RTPStream::ProcessAckPacket会调用fResender.AckPacket对收到的包进行确认(从发送数组中移除)
Bool16 RTPStream::ProcessAckPacket(RTCPPacket &rtcpPacket, SInt64 &curTime)
{
	RTCPAckPacket theAckPacket;
	UInt8* packetBuffer = rtcpPacket.GetPacketBuffer();
	UInt32 packetLen = (rtcpPacket.GetPacketLength() * 4) + RTCPPacket::kRTCPHeaderSizeInBytes;

	if (!theAckPacket.ParseAPPData(packetBuffer, packetLen))
		return false;

	if (NULL != fTracker && false == fTracker->ReadyForAckProcessing()) // this stream must be ready to receive acks.  Between RTSP setup and sending of first packet on stream we must protect against a bad ack.
		return false;//abort if we receive an ack when we haven't sent anything.


	this->PrintPacketPrefEnabled((char*)packetBuffer, packetLen, RTPStream::rtcpACK);
	// Only check for ack packets if we are using Reliable UDP
	if (fTransportType == qtssRTPTransportTypeReliableUDP)
	{
		UInt16 theSeqNum = theAckPacket.GetAckSeqNum();
		fResender.AckPacket(theSeqNum, curTime);
		//qtss_printf("Got ack: %d\n",theSeqNum);

		for (UInt16 maskCount = 0; maskCount < theAckPacket.GetAckMaskSizeInBits(); maskCount++)
		{
			if (theAckPacket.IsNthBitEnabled(maskCount))
			{
				fResender.AckPacket(theSeqNum + maskCount + 1, curTime);
				//qtss_printf("Got ack in mask: %d\n",theSeqNum + maskCount + 1);
			}
		}
	}
	return true;
}
RTPPacketResender::AckPacket调用 RTPPacketResender::RemovePacket将确认的包从fPacketArray中移除。
3.每次调用 RTPStream::ReliableRTPWrite发送数据时,首先都会调用(*retransStream)->fResender.ResendDueEntries()去检查有没有包在超时时间内没有收到确认,加入有就重发, ResendDueEntries()具体代码如下:
void RTPPacketResender::ResendDueEntries()
{
	if (fPacketsInList <= 0)
		return;

	SInt32 numResends = 0;
	RTPResenderEntry* theEntry = NULL;
	SInt64 curTime = OS::Milliseconds();
	for (SInt32 packetIndex = fPacketsInList - 1; packetIndex >= 0; packetIndex--) // walk backwards because remove packet moves array members forward
	{
		theEntry = &fPacketArray[packetIndex];

		if (theEntry->fPacketSize == 0)
			continue;

		if ((curTime - theEntry->fAddedTime) > fBandwidthTracker->CurRetransmitTimeout())
		{
			// Change:  Only expire packets after they were due to be resent. This gives the client
			// a chance to ack them and improves congestion avoidance and RTT calculation
			if (curTime > theEntry->fExpireTime)
			{
				//
				// This packet is expired
				fNumExpired++;
				//qtss_printf("Packet expired: %d\n", ((UInt16*)thePacket)[1]);
				fBandwidthTracker->EmptyWindow(theEntry->fPacketSize);
				this->RemovePacket(packetIndex);
				//              qtss_printf("Expired packet %d\n", theEntry->fSeqNum);
				continue;
			}

			// Resend this packet
			fSocket->SendTo(fDestAddr, fDestPort, theEntry->fPacketData, theEntry->fPacketSize);
			//qtss_printf("Packet resent: %d\n", ((UInt16*)theEntry->fPacketData)[1]);

			theEntry->fNumResends++;    

			fNumResends++;

			numResends++;
			//qtss_printf("resend loop numResends=%" _S32BITARG_ " packet theEntry->fNumResends=%" _S32BITARG_ " stream fNumResends=\n",numResends,theEntry->fNumResends++, fNumResends);

			// ok -- lets try this.. add 1.5x of the INITIAL duration since the last send to the rto estimator
			// since we won't get an ack on this packet
			// this should keep us from exponentially increasing due o a one time increase
			// in the actuall rtt, only AddToEstimate on the first resend ( assume that it's a dupe )
			// if it's not a dupe, but rather an actual loss, the subseqnuent actuals wil bring down the average quickly

			if (theEntry->fNumResends == 1)
				fBandwidthTracker->AddToRTTEstimate((SInt32)((theEntry->fOrigRetransTimeout * 3) / 2));

			//          qtss_printf("Retransmitted packet %d\n", theEntry->fSeqNum);
			theEntry->fAddedTime = curTime;
			fBandwidthTracker->AdjustWindowForRetransmit();
			continue;
		}

	}
}

从上面代码可以看出,首先判断是否超时,接着还需要判断包是否过期了,如果过期了直接调用RemovePacket删除,如果没有就重发。

超时判断代码:if((curTime - theEntry->fAddedTime) > fBandwidthTracker->CurRetransmitTimeout())

包过期判断代码:if (curTime > theEntry->fExpireTime)

还有一点关于超时时间RTT的计算,会在后面的文章中说明。





 类似资料: