当前位置: 首页 > 工具软件 > Kurento > 使用案例 >

webrtc入门:10.Kurento流程分析

江宏放
2023-12-01

Kurento 我们通常成他为 Kurento media server ,简称为kms,他是webrtc下的媒体服务器。kms 像我们上一节使用的那样,我们会对kms的流程进行简单的介绍。

kms是基于GStreamer下进行开发的,有关GStreamer开发可以找相应的资料。我们还是依照两个依据,一个是sdp,另外一个是ice

sdp的生成

在上一个例子中,我们使用的是kmsjavaapikms和java的api也是通过socket进行通讯的。通过JsonRpc来进行通讯,比如processOffer是通过反射的方式invoke 调用到目标函数。

void
SdpEndpointImpl::invoke (std::shared_ptr<MediaObjectImpl> obj, const std::string &methodName, const Json::Value &params, Json::Value &response)
{
...

  if (methodName == "processOffer") {
    kurento::JsonSerializer s (false);
    SdpEndpointMethodProcessOffer method;
    JsonSerializer responseSerializer (true);
    std::string ret;

    s.JsonValue = params;
    method.Serialize (s);

    ret = method.invoke (std::dynamic_pointer_cast<SdpEndpoint> (obj) );
    responseSerializer.SerializeNVP (ret);
    response = responseSerializer.JsonValue["ret"];
    return;
  }
...

  SessionEndpointImpl::invoke (obj, methodName, params, response);
}

具体看看调用的是哪个函数:

std::string SdpEndpointImpl::processOffer (const std::string &offer)
{
  GstSDPMessage *offerSdp = nullptr, *result = nullptr;
  std::string offerSdpStr;
  bool expected = false;

  if (offer.empty () ) {
    throw KurentoException (SDP_PARSE_ERROR, "Empty offer not valid");
  }

  offerSdp = str_to_sdp (offer);

  if (!offerInProcess.compare_exchange_strong (expected, true) ) {
    //the endpoint is already negotiated
    throw KurentoException (SDP_END_POINT_ALREADY_NEGOTIATED,
                            "Endpoint already negotiated");
  }

  g_signal_emit_by_name (element, "process-offer", sessId.c_str (), offerSdp,
                         &result);
  gst_sdp_message_free (offerSdp);

  if (result == nullptr) {
    offerInProcess = false;
    throw KurentoException (SDP_END_POINT_PROCESS_OFFER_ERROR,
                            "Error processing offer");
  }

  sdp_to_str (offerSdpStr, result);
  gst_sdp_message_free (result);

  try {
    MediaSessionStarted event (shared_from_this (),
        MediaSessionStarted::getName ());
    sigcSignalEmit(signalMediaSessionStarted, event);
  } catch (const std::bad_weak_ptr &e) {
    // shared_from_this()
    GST_ERROR ("BUG creating %s: %s", MediaSessionStarted::getName ().c_str (),
        e.what ());
  }

  GST_INFO("creating sdp: %s",offerSdpStr.c_str());

  return offerSdpStr;
}

g_signal_emit_by_name (element, "process-offer" 调用了process-offer的信号,其实是调用到了kms_sdp_session_process_offer函数。接着调用到底层kms_sdp_agent_generate_answer

static GstSDPMessage *
kms_sdp_agent_generate_answer (KmsSdpAgent * agent,
    const GstSDPMessage * offer, GError ** error)
{
  ....
GST_INFO("gst_sdp_message_new");
  if (gst_sdp_message_new (&answer) != GST_SDP_OK) {
    g_set_error_literal (error, KMS_SDP_AGENT_ERROR, SDP_AGENT_INVALID_STATE,
        "Can not allocate SDP answer");
    goto end;
  }

  if (!kms_sdp_agent_set_default_session_attributes (answer, error)) {
    goto end;
  }

  if (!kms_sdp_agent_set_origin (answer, &o, error)) {
    goto end;
  }

  ...

  return answer;
}

gst_sdp_message_newGStreamer的底层生成sdp函数了。

ice

ice的生成使用的是 地方库libnice,可以参考这里

对于kms来说,他是一种模块化的加载方式,自动会加载class_init的函数。通过kms_ice_nice_agent_new_candidate_full后,再到 kms_ice_nice_agent_new_candidate_full的函数。

static char *
kms_ice_nice_agent_get_candidate_sdp_string (NiceAgent * agent,
    NiceCandidate * candidate)
{
  //printf("nice_agent_generate_local_candidate_sdp\r\n");
  gchar *str = nice_agent_generate_local_candidate_sdp (agent, candidate);
  gchar *cand = g_strconcat (SDP_CANDIDATE_ATTR, ":",
      (str + SDP_CANDIDATE_ATTR_LEN), NULL);

  g_free (str);

  return cand;
}

nice_agent_generate_local_candidate_sdp 函数,就是libnice的底层函数了。

生成的ice,通过socket发走:

sigc::connection conn = signalIceCandidateFound.connect ([ &, wh] (IceCandidateFound event) {
      //std::cout <<"android pp WebRtcEndpointImpl::connect IceCandidateFound event: "<<"signalIceCandidateFound " << std::endl;
      std::shared_ptr<EventHandler> lh = wh.lock();
      if (!lh)
        return;

      std::shared_ptr<IceCandidateFound> ev_ref (new IceCandidateFound(event));
      auto object = this->shared_from_this();

      lh->sendEventAsync ([ev_ref, object, lh] {
        JsonSerializer s (true);

        s.Serialize ("data", ev_ref.get());
        s.Serialize ("object", object.get());
        s.JsonValue["type"] = "IceCandidateFound";

        lh->sendEvent (s.JsonValue);
      });
    });

如何连接

创建通道,在这个通道中进行数据的连接。

static void
rtp_ssrc_demux_new_ssrc_pad (GstElement * ssrcdemux, guint ssrc, GstPad * pad,
    KmsBaseRtpSession * self)
{
  GST_INFO("rtp_ssrc_demux_new_ssrc_pad");
  const gchar *rtp_pad_name = GST_OBJECT_NAME (pad);
  gchar *rtcp_pad_name;
  const GstSDPMedia *media;
  GstPad *src, *sink;

  GST_INFO_OBJECT (self, "pad: %" GST_PTR_FORMAT " ssrc: %" G_GUINT32_FORMAT,
      pad, ssrc);

  KMS_SDP_SESSION_LOCK (self);

  if (self->remote_audio_ssrc == ssrc
      || ssrcs_are_mapped (ssrcdemux, self->local_audio_ssrc, ssrc)) {
    media = self->audio_neg;
  } else if (self->remote_video_ssrc == ssrc
      || ssrcs_are_mapped (ssrcdemux, self->local_video_ssrc, ssrc)) {
    media = self->video_neg;
  } else {
    if (kms_i_rtp_session_manager_custom_ssrc_management (self->manager, self,
            ssrcdemux, ssrc, pad)) {
      goto end;
    } else {
      GST_WARNING_OBJECT (pad,
          "Ignoring media from non-matching SSRC: %" G_GUINT32_FORMAT, ssrc);

      media = NULL;
    }
  }

  /* RTP */
  sink = kms_i_rtp_session_manager_request_rtp_sink (self->manager, self, media);
  kms_base_rtp_session_link_pads (pad, sink);
  g_object_unref (sink);

  /* RTCP */
  rtcp_pad_name = g_strconcat ("rtcp_", rtp_pad_name, NULL);
  src = gst_element_get_static_pad (ssrcdemux, rtcp_pad_name);
  g_free (rtcp_pad_name);
  sink = kms_i_rtp_session_manager_request_rtcp_sink (self->manager, self, media);
  kms_base_rtp_session_link_pads (src, sink);
  g_object_unref (src);
  g_object_unref (sink);

end:
  KMS_SDP_SESSION_UNLOCK (self);
}

完成了数据的交换。

 类似资料: