2019年3月写的文章了,后续也没有再跟进这个项目有没有新变化。
MediaSoup是一个开源的SFU库,分为客户端和服务端。服务端分为JS层和C++层,C++层用于处理媒体和SDP等数据。我个人主要关注媒体相关的处理,也就是RTP和RTCP相关的处理。我们的项目不会用到这个项目,看它的代码主要是解决我的两个疑问:
第一个问题,它不做任何处理,接收端仅仅是做了接收端拥塞控制该做的那部分接收端带宽估计的工作而已。它不会根据其他端的接收情况(下行带宽)调节发送端的带宽。
第二个问题,服务端类似一个客户端接收端和一个发送端,接收端需要做好的工作就是保证自己收到完整的包(NACK/FIR等)、给到发送端接收端的统计信息(RR)。发送端就是接收每一个接收端的RTCP请求并做相应的处理,但是它仅仅支持丢包处理工作,不会处理码率相关的工作,理论上来说它应该要根据每一个接收端的接收能力去得出一个合理的码率,并把这个码率告诉真实的发送端。
WebRtcTransport::OnRtpDataRecv
// 只保留媒体相关的处理逻辑,方便阅读
inline void WebRtcTransport::OnRtpDataRecv(RTC::TransportTuple* tuple, const uint8_t* data, size_t len)
{
// Decrypt the SRTP packet.
if (!this->srtpRecvSession->DecryptSrtp(data, &len)) {
return;
}
// Mirror RTP if needed.
if (this->mirrorTuple != nullptr && this->mirroringOptions.recvRtp)
this->mirrorTuple->Send(data, len);
RTC::RtpPacket* packet = RTC::RtpPacket::Parse(data, len);
if (packet == nullptr) {
return;
}
// Apply the Transport RTP header extension ids so the RTP listener can use them.
if (this->headerExtensionIds.absSendTime != 0u) {
packet->AddExtensionMapping(RtpHeaderExtensionUri::Type::ABS_SEND_TIME, this->headerExtensionIds.absSendTime);
}
if (this->headerExtensionIds.mid != 0u) {
packet->AddExtensionMapping(RtpHeaderExtensionUri::Type::MID, this->headerExtensionIds.mid);
}
if (this->headerExtensionIds.rid != 0u) {
packet->AddExtensionMapping(RtpHeaderExtensionUri::Type::RTP_STREAM_ID, this->headerExtensionIds.rid);
}
// Feed the remote bitrate estimator (REMB).
uint32_t absSendTime;
if (packet->ReadAbsSendTime(&absSendTime)) {
this->remoteBitrateEstimator->IncomingPacket(DepLibUV::GetTime(), packet->GetPayloadLength(), *packet, absSendTime);
}
// Get the associated Producer.
RTC::Producer* producer = this->rtpListener.GetProducer(packet);
if (producer == nullptr) {
delete packet;
return;
}
// Pass the RTP packet to the corresponding Producer.
producer->ReceiveRtpPacket(packet);
delete packet;
}
Producer::ReceiveRtpPacket
void Producer::ReceiveRtpPacket(RTC::RtpPacket* packet)
{
// May need to create a new RtpStreamRecv.
MayNeedNewStream(packet);
// Find the corresponding RtpStreamRecv.
uint32_t ssrc = packet->GetSsrc();
RTC::RtpStreamRecv* rtpStream{ nullptr };
RTC::RtpEncodingParameters::Profile profile;
std::unique_ptr<RTC::RtpPacket> clonedPacket;
// Media RTP stream found.
if (this->mapSsrcRtpStreamInfo.find(ssrc) != this->mapSsrcRtpStreamInfo.end()) {
rtpStream = this->mapSsrcRtpStreamInfo[ssrc].rtpStream;
auto& info = this->mapSsrcRtpStreamInfo[ssrc];
rtpStream = info.rtpStream;
profile = info.profile;
clonedPacket.reset(packet->Clone(ClonedPacketBuffer));
packet = clonedPacket.get();
// Process the packet.
if (!rtpStream->ReceivePacket(packet))
return;
} else {
for (auto& kv : this->mapSsrcRtpStreamInfo) {
auto& info = kv.second;
if (info.rtxSsrc != 0u && info.rtxSsrc == ssrc) {
rtpStream = info.rtpStream;
profile = info.profile;
clonedPacket.reset(packet->Clone(ClonedPacketBuffer));
packet = clonedPacket.get();
// Process the packet.
if (!rtpStream->ReceiveRtxPacket(packet))
return;
// Packet repaired after applying RTX.
rtpStream->packetsRepaired++;
break;
}
}
}
ApplyRtpMapping(packet);
for (auto& listener : this->listeners) {
listener->OnProducerRtpPacket(this, packet, profile);
}
}
Consumer::SendRtpPacket
void Consumer::SendRtpPacket(RTC::RtpPacket* packet, RTC::RtpEncodingParameters::Profile profile)
{
// Map the payload type.
auto payloadType = packet->GetPayloadType();
// NOTE: This may happen if this Consumer supports just some codecs of those in the corresponding Producer.
if (this->supportedCodecPayloadTypes.find(payloadType) == this->supportedCodecPayloadTypes.end())
return;
// Check whether this is the key frame we are waiting for in order to update the effective profile.
if (this->effectiveProfile != this->targetProfile && profile == this->targetProfile) {
bool isKeyFrame = false;
bool canBeKeyFrame = Codecs::CanBeKeyFrame(this->rtpStream->GetMimeType());
if (canBeKeyFrame && packet->IsKeyFrame()) {
isKeyFrame = true;
if (isKeyFrame || !canBeKeyFrame) {
SetEffectiveProfile(this->targetProfile);
// Resynchronize the stream.
this->syncRequired = true;
// Clear RTP retransmission buffer to avoid congesting the receiver by
// sending useless retransmissions (now that we are sending a newer key frame).
this->rtpStream->ClearRetransmissionBuffer();
// Stop probation if probing profile is the new effective profile.
if (IsProbing() && this->probingProfile == this->effectiveProfile)
StopProbation();
}
}
bool isSyncPacket = false;
if (this->syncRequired) {
isSyncPacket = true;
this->rtpSeqManager.Sync(packet->GetSequenceNumber());
this->rtpTimestampManager.Sync(packet->GetTimestamp());
// Calculate RTP timestamp diff between now and last sent RTP packet.
if (this->rtpStream->GetMaxPacketMs() != 0u) {
auto now = DepLibUV::GetTime();
auto diffMs = now - this->rtpStream->GetMaxPacketMs();
auto diffTs = diffMs * this->rtpStream->GetClockRate() / 1000;
this->rtpTimestampManager.Offset(diffTs);
}
this->syncRequired = false;
if (this->encodingContext)
this->encodingContext->SyncRequired();
}
// Rewrite payload if needed. Drop packet if necessary.
if (this->encodingContext && !packet->EncodePayload(this->encodingContext.get())) {
this->rtpSeqManager.Drop(packet->GetSequenceNumber());
this->rtpTimestampManager.Drop(packet->GetTimestamp());
return;
}
// Update RTP seq number and timestamp.
uint16_t rtpSeq;
uint32_t rtpTimestamp;
this->rtpSeqManager.Input(packet->GetSequenceNumber(), rtpSeq);
this->rtpTimestampManager.Input(packet->GetTimestamp(), rtpTimestamp);
auto origSsrc = packet->GetSsrc();
auto origSeq = packet->GetSequenceNumber();
auto origTimestamp = packet->GetTimestamp();
// Rewrite packet
packet->SetSsrc(this->rtpParameters.encodings[0].ssrc);
packet->SetSequenceNumber(rtpSeq);
packet->SetTimestamp(rtpTimestamp);
// Process the packet.
if (this->rtpStream->ReceivePacket(packet)) {
// Send the packet.
this->transport->SendRtpPacket(packet);
// Retransmit the RTP packet if probing.
if (IsProbing())
SendProbation(packet);
}
// Restore
packet->SetSsrc(origSsrc);
packet->SetSequenceNumber(origSeq);
packet->SetTimestamp(origTimestamp);
// Restore the original payload if needed.
if (this->encodingContext)
packet->RestorePayload();
// Run probation if needed.
if (this->kind == RTC::Media::Kind::VIDEO && --this->rtpPacketsBeforeProbation == 0) {
this->rtpPacketsBeforeProbation = RtpPacketsBeforeProbation;
MayRunProbation();
}
}