Kurento
我们通常成他为 Kurento media server
,简称为kms
,他是webrtc
下的媒体服务器。kms
像我们上一节使用的那样,我们会对kms
的流程进行简单的介绍。
kms
是基于GStreamer
下进行开发的,有关GStreamer
开发可以找相应的资料。我们还是依照两个依据,一个是sdp
,另外一个是ice
。
在上一个例子中,我们使用的是kms
中java
的api
,kms
和java的api
也是通过socket进行通讯的。通过JsonRpc
来进行通讯,比如processOffer
是通过反射的方式invoke
调用到目标函数。
void
SdpEndpointImpl::invoke (std::shared_ptr<MediaObjectImpl> obj, const std::string &methodName, const Json::Value ¶ms, Json::Value &response)
{
...
if (methodName == "processOffer") {
kurento::JsonSerializer s (false);
SdpEndpointMethodProcessOffer method;
JsonSerializer responseSerializer (true);
std::string ret;
s.JsonValue = params;
method.Serialize (s);
ret = method.invoke (std::dynamic_pointer_cast<SdpEndpoint> (obj) );
responseSerializer.SerializeNVP (ret);
response = responseSerializer.JsonValue["ret"];
return;
}
...
SessionEndpointImpl::invoke (obj, methodName, params, response);
}
具体看看调用的是哪个函数:
std::string SdpEndpointImpl::processOffer (const std::string &offer)
{
GstSDPMessage *offerSdp = nullptr, *result = nullptr;
std::string offerSdpStr;
bool expected = false;
if (offer.empty () ) {
throw KurentoException (SDP_PARSE_ERROR, "Empty offer not valid");
}
offerSdp = str_to_sdp (offer);
if (!offerInProcess.compare_exchange_strong (expected, true) ) {
//the endpoint is already negotiated
throw KurentoException (SDP_END_POINT_ALREADY_NEGOTIATED,
"Endpoint already negotiated");
}
g_signal_emit_by_name (element, "process-offer", sessId.c_str (), offerSdp,
&result);
gst_sdp_message_free (offerSdp);
if (result == nullptr) {
offerInProcess = false;
throw KurentoException (SDP_END_POINT_PROCESS_OFFER_ERROR,
"Error processing offer");
}
sdp_to_str (offerSdpStr, result);
gst_sdp_message_free (result);
try {
MediaSessionStarted event (shared_from_this (),
MediaSessionStarted::getName ());
sigcSignalEmit(signalMediaSessionStarted, event);
} catch (const std::bad_weak_ptr &e) {
// shared_from_this()
GST_ERROR ("BUG creating %s: %s", MediaSessionStarted::getName ().c_str (),
e.what ());
}
GST_INFO("creating sdp: %s",offerSdpStr.c_str());
return offerSdpStr;
}
g_signal_emit_by_name (element, "process-offer"
调用了process-offer
的信号,其实是调用到了kms_sdp_session_process_offer
函数。接着调用到底层kms_sdp_agent_generate_answer
。
static GstSDPMessage *
kms_sdp_agent_generate_answer (KmsSdpAgent * agent,
const GstSDPMessage * offer, GError ** error)
{
....
GST_INFO("gst_sdp_message_new");
if (gst_sdp_message_new (&answer) != GST_SDP_OK) {
g_set_error_literal (error, KMS_SDP_AGENT_ERROR, SDP_AGENT_INVALID_STATE,
"Can not allocate SDP answer");
goto end;
}
if (!kms_sdp_agent_set_default_session_attributes (answer, error)) {
goto end;
}
if (!kms_sdp_agent_set_origin (answer, &o, error)) {
goto end;
}
...
return answer;
}
gst_sdp_message_new
是GStreamer
的底层生成sdp
函数了。
ice的生成使用的是 地方库libnice
,可以参考这里。
对于kms
来说,他是一种模块化的加载方式,自动会加载class_init
的函数。通过kms_ice_nice_agent_new_candidate_full
后,再到 kms_ice_nice_agent_new_candidate_full
的函数。
static char *
kms_ice_nice_agent_get_candidate_sdp_string (NiceAgent * agent,
NiceCandidate * candidate)
{
//printf("nice_agent_generate_local_candidate_sdp\r\n");
gchar *str = nice_agent_generate_local_candidate_sdp (agent, candidate);
gchar *cand = g_strconcat (SDP_CANDIDATE_ATTR, ":",
(str + SDP_CANDIDATE_ATTR_LEN), NULL);
g_free (str);
return cand;
}
nice_agent_generate_local_candidate_sdp
函数,就是libnice
的底层函数了。
生成的ice,通过socket发走:
sigc::connection conn = signalIceCandidateFound.connect ([ &, wh] (IceCandidateFound event) {
//std::cout <<"android pp WebRtcEndpointImpl::connect IceCandidateFound event: "<<"signalIceCandidateFound " << std::endl;
std::shared_ptr<EventHandler> lh = wh.lock();
if (!lh)
return;
std::shared_ptr<IceCandidateFound> ev_ref (new IceCandidateFound(event));
auto object = this->shared_from_this();
lh->sendEventAsync ([ev_ref, object, lh] {
JsonSerializer s (true);
s.Serialize ("data", ev_ref.get());
s.Serialize ("object", object.get());
s.JsonValue["type"] = "IceCandidateFound";
lh->sendEvent (s.JsonValue);
});
});
创建通道,在这个通道中进行数据的连接。
static void
rtp_ssrc_demux_new_ssrc_pad (GstElement * ssrcdemux, guint ssrc, GstPad * pad,
KmsBaseRtpSession * self)
{
GST_INFO("rtp_ssrc_demux_new_ssrc_pad");
const gchar *rtp_pad_name = GST_OBJECT_NAME (pad);
gchar *rtcp_pad_name;
const GstSDPMedia *media;
GstPad *src, *sink;
GST_INFO_OBJECT (self, "pad: %" GST_PTR_FORMAT " ssrc: %" G_GUINT32_FORMAT,
pad, ssrc);
KMS_SDP_SESSION_LOCK (self);
if (self->remote_audio_ssrc == ssrc
|| ssrcs_are_mapped (ssrcdemux, self->local_audio_ssrc, ssrc)) {
media = self->audio_neg;
} else if (self->remote_video_ssrc == ssrc
|| ssrcs_are_mapped (ssrcdemux, self->local_video_ssrc, ssrc)) {
media = self->video_neg;
} else {
if (kms_i_rtp_session_manager_custom_ssrc_management (self->manager, self,
ssrcdemux, ssrc, pad)) {
goto end;
} else {
GST_WARNING_OBJECT (pad,
"Ignoring media from non-matching SSRC: %" G_GUINT32_FORMAT, ssrc);
media = NULL;
}
}
/* RTP */
sink = kms_i_rtp_session_manager_request_rtp_sink (self->manager, self, media);
kms_base_rtp_session_link_pads (pad, sink);
g_object_unref (sink);
/* RTCP */
rtcp_pad_name = g_strconcat ("rtcp_", rtp_pad_name, NULL);
src = gst_element_get_static_pad (ssrcdemux, rtcp_pad_name);
g_free (rtcp_pad_name);
sink = kms_i_rtp_session_manager_request_rtcp_sink (self->manager, self, media);
kms_base_rtp_session_link_pads (src, sink);
g_object_unref (src);
g_object_unref (sink);
end:
KMS_SDP_SESSION_UNLOCK (self);
}
完成了数据的交换。