当完成rtpbaseendpoint的初始化后,根据我们业务调用的逻辑。首先是在服务端调用genrateOff方法,从而生成服务端的sdp信息返回给远端RTP,
然后远端处理sdp后返回answer给本端rtp,再然后就是本端的sdpbaseendpoint继续answer这个offer后,进开始发送数据。流程大致如下

假如我们有A服务器的rtp端点需要和B服务器的rtp端点进行通信,具体的代码在kurento中调用方法的流程如下:
首先在A中,我们调用genrateOffer方法,实际是发送了1个generate_offer信号,引发了kmsbasesdpendpoint.c中注册的回调

kms_base_sdp_endpoint_class_init (KmsBaseSdpEndpointClass * klass)方法中
  klass->generate_offer = kms_base_sdp_endpoint_generate_offer;
  klass->process_offer = kms_base_sdp_endpoint_process_offer;
  klass->process_answer = kms_base_sdp_endpoint_process_answer;
A中将触发kms_base_sdp_endpoint_generate_offer方法,调用这个方法后,将调用
kms_base_sdp_endpoint_init_sdp_handlers (self, sess),初始化sdp处理类,这里升入解析后发现其实并么有真正处理SDP
然后通过 offer = kms_sdp_session_generate_offer (sess),这里才最终生成所需要的offer,详细生成offer的方法如下:

在kmssdpsession.c文件中.
GstSDPMessage *
kms_sdp_session_generate_offer (KmsSdpSession * self)
{
  GstSDPMessage *offer = NULL;
  GError *err = NULL;
  gchar *sdp_str = NULL;

//这里方法有引用了sdp_agent的create_offer方法,真是层层调用.
  offer = kms_sdp_agent_create_offer (self->agent, &err);
  if (err != NULL) {
    GST_ERROR_OBJECT (self, "Generating SDP Offer: %s", err->message);
    goto error;
  }

kms_sdp_agent_set_local_description (self->agent, offer, &err);
  if (err != NULL) {
    GST_ERROR_OBJECT (self, "Generating SDP Offer: %s", err->message);
    goto error;
  }

if (gst_sdp_message_copy (offer, &self->local_sdp) != GST_SDP_OK) {
    GST_ERROR_OBJECT (self, "Generating SDP Offer: gst_sdp_message_copy");
    goto error;
  }

GST_DEBUG_OBJECT (self, "Generated SDP Offer:\n%s",
      (sdp_str = gst_sdp_message_as_text (offer)));
  g_free (sdp_str);
  sdp_str = NULL;

return offer;

error:
  g_clear_error (&err);

if (offer != NULL) {
    gst_sdp_message_free (offer);
  }

return NULL;
}

继续关注sdpagent.c中的方法

GstSDPMessage *
kms_sdp_agent_create_offer (KmsSdpAgent * agent, GError ** error)
{
  g_return_val_if_fail (KMS_IS_SDP_AGENT (agent), NULL);

return KMS_SDP_AGENT_GET_CLASS (agent)->create_offer (agent, error);
}

继续看sdp_agent的create_offer方法的实现方法
  klass->create_offer = kms_sdp_agent_create_offer_impl;具体方法实现如下

static GstSDPMessage *
kms_sdp_agent_create_offer_impl (KmsSdpAgent * agent, GError ** error)
{
  GstSDPMessage *offer = NULL;
  GstSDPOrigin o;
  GSList *tmp = NULL;
  gboolean state_changed = FALSE;
  gboolean failed = TRUE;

SDP_AGENT_LOCK (agent);

if (agent->priv->state != KMS_SDP_AGENT_STATE_UNNEGOTIATED &&
      agent->priv->state != KMS_SDP_AGENT_STATE_NEGOTIATED) {
    g_set_error (error, KMS_SDP_AGENT_ERROR, SDP_AGENT_INVALID_STATE,
        "Agent in state %s", SDP_AGENT_STATE (agent));
    goto end;
  }

if (gst_sdp_message_new (&offer) != GST_SDP_OK) {
    g_set_error_literal (error, KMS_SDP_AGENT_ERROR, SDP_AGENT_INVALID_STATE,
        "Can not allocate SDP offer");
    goto end;
  }

if (!kms_sdp_agent_set_default_session_attributes (offer, error)) {
    goto end;
  }

if (agent->priv->state == KMS_SDP_AGENT_STATE_NEGOTIATED) {
    const GstSDPOrigin *orig;

orig = gst_sdp_message_get_origin (agent->priv->local_description);
    set_sdp_session_description (&agent->priv->local, orig->sess_id,
        orig->sess_version);
  } else {
    generate_sdp_session_description (&agent->priv->local);
  }

kms_sdp_agent_origin_init (agent, &o, agent->priv->local.id,
      agent->priv->local.version);

if (!kms_sdp_agent_set_origin (offer, &o, error)) {
    goto end;
  }

/* Execute pre-processing extensions */
  if (!kms_sdp_agent_offer_processing_extensions (agent, offer, TRUE, error)) {
    goto end;
  }

tmp = g_slist_copy_deep (agent->priv->offer_handlers,
      (GCopyFunc) sdp_handler_ref, NULL);
  kms_sdp_agent_merge_offer_handlers (agent);

/* Process medias */
  if (!kms_sdp_agent_create_media_offer (agent, offer, error)) {
    goto end;
  }

if (!kms_sdp_agent_update_session_version (agent, offer, error)) {
    goto end;
  }

/* Execute post-processing extensions */
  if (!kms_sdp_agent_offer_processing_extensions (agent, offer, FALSE, error)) {
    gst_sdp_message_free (offer);
    offer = NULL;
    goto end;
  }

g_slist_free_full (tmp, (GDestroyNotify) kms_ref_struct_unref);
  SDP_AGENT_NEW_STATE (agent, KMS_SDP_AGENT_STATE_LOCAL_OFFER);
  state_changed = TRUE;
  tmp = NULL;

failed = FALSE;

end:

if (tmp != NULL) {
    g_slist_free_full (agent->priv->offer_handlers,
        (GDestroyNotify) kms_ref_struct_unref);
    agent->priv->offer_handlers = tmp;
  }

SDP_AGENT_UNLOCK (agent);

if (failed && offer != NULL) {
    gst_sdp_message_free (offer);
    return NULL;
  }

if (state_changed) {
    g_object_notify_by_pspec (G_OBJECT (agent), obj_properties[PROP_STATE]);
  }

return offer;
}

其中几个主要的GST原生的结构体
GstSDPOrigin  ,  spd协议中o=xxx,后面的内容

/**
* GstSDPOrigin:
* @username: the user's login on the originating host, or it is "-"
*    if the originating host does not support the concept of user ids.
* @sess_id: is a numeric string such that the tuple of @username, @sess_id,
*    @nettype, @addrtype and @addr form a globally unique identifier for the
*    session.
* @sess_version: a version number for this announcement
* @nettype: the type of network. "IN" is defined to have the meaning
*    "Internet".
* @addrtype: the type of @addr.
* @addr: the globally unique address of the machine from which the session was
*     created.
*
* The contents of the SDP "o=" field which gives the originator of the session
* (their username and the address of the user's host) plus a session id and
* session version number.
*/
typedef struct {
  gchar *username;
  gchar *sess_id;
  gchar *sess_version;
  gchar *nettype;
  gchar *addrtype;
  gchar *addr;
} GstSDPOrigin;

sdp协议中,c=xxx后面的内容

/**
* GstSDPConnection:
* @nettype: the type of network. "IN" is defined to have the meaning
*    "Internet".
* @addrtype: the type of @address.
* @address: the address
* @ttl: the time to live of the address
* @addr_number: the number of layers
*
* The contents of the SDP "c=" field which contains connection data.
*/
typedef struct {
  gchar *nettype;
  gchar *addrtype;
  gchar *address;
  guint  ttl;
  guint  addr_number;
} GstSDPConnection;

sdp协议最后解析为结构体的格式

/**
 * GstSDPMessage:
 * @version: the protocol version
 * @origin: owner/creator and session identifier
 * @session_name: session name
 * @information: session information
 * @uri: URI of description
 * @emails: array of #gchar with email addresses
 * @phones: array of #gchar with phone numbers
 * @connection: connection information for the session
 * @bandwidths: array of #GstSDPBandwidth with bandwidth information
 * @times: array of #GstSDPTime with time descriptions
 * @zones: array of #GstSDPZone with time zone adjustments
 * @key: encryption key
 * @attributes: array of #GstSDPAttribute with session attributes
 * @medias: array of #GstSDPMedia with media descriptions
 *
 * The contents of the SDP message.
 */
typedef struct {
  gchar            *version;
  GstSDPOrigin      origin;
  gchar            *session_name;
  gchar            *information;
  gchar            *uri;
  GArray           *emails;
  GArray           *phones;
  GstSDPConnection  connection;
  GArray           *bandwidths;
  GArray           *times;
  GArray           *zones;
  GstSDPKey         key;
  GArray           *attributes;
  GArray           *medias;
} GstSDPMessage;

//根据调用rtp的业务流程,当创建sdp并且建立网络链接后,baseRtpEndpoint将根据media的类型来创建并添加1个数据拆包的pad,然后根据再根据拉取数据的endpoint数据类型,看后续如何处理
//具体调用方法

static void
kms_base_rtp_endpoint_rtpbin_pad_added(
    GstElement *rtpbin, GstPad *pad, KmsBaseRtpEndpoint *self)
{
    //agnostic这里可以认为是1个临时的中间pad,用于根据map协商的处理媒体信息来确认下一步连接什么处理的拆包pad
  //depayloader则是拆包的bin,如果拉流和推流是相同的媒体类型,则只是拆包
  //如果拉流和推流媒体类型不一致,并且都是系统支持的编码类型,则会自动进行转码
  GstElement *agnostic, *depayloader;
  gboolean added = TRUE;
  KmsMediaType media;
  GstCaps *caps;

GST_PAD_STREAM_LOCK(pad);

//检测如果是音频,获取1个音频转码的element
  if (g_str_has_prefix(GST_OBJECT_NAME(pad), AUDIO_RTPBIN_RECV_RTP_SRC))
  {
    agnostic = kms_element_get_audio_agnosticbin(KMS_ELEMENT(self));
    media = KMS_MEDIA_TYPE_AUDIO;
  }

//检测如果是视频,获取1个视频可以转码的element
  else if (g_str_has_prefix(
               GST_OBJECT_NAME(pad), VIDEO_RTPBIN_RECV_RTP_SRC))
  {
    agnostic = kms_element_get_video_agnosticbin(KMS_ELEMENT(self));
    media = KMS_MEDIA_TYPE_VIDEO;
//这里用于处理rtcp需要返回的相关信息
    if (self->priv->rl != NULL)
    {
      self->priv->rl->event_manager = kms_utils_remb_event_manager_create(pad);
    }
  }
  //如果是其他的数据,则不处理
  else
  {
    added = FALSE;
    goto end;
  }
//获取rtp中sdp携带的当前处理媒体流的负载类型信息
//caps=application/x-rtp, media=(string)video, payload=(int)96, clock-rate=(int)90000, encoding-name=(string)H264, packetization-mode=(string)1, sprop-parameter-sets=(string)"Z2QAH62EAQwgCGEAQwgCGEAQwgCEK1AoAt03AQEBQAAAAwBAAAAMoQ\=\=\,aO48sA\=\=", profile-level-id=(string)64001F
  caps = gst_pad_query_caps(pad, NULL);
  GST_ERROR_OBJECT(self,
                   "New pad: %" GST_PTR_FORMAT " for linking to %" GST_PTR_FORMAT
                   " with caps %" GST_PTR_FORMAT,
                   pad, agnostic, caps);
//根据rtp的sdp协商的信息来获取正确的拆包的gst对象,目前video支持的h264和vp8
  depayloader = kms_base_rtp_endpoint_get_depayloader_for_caps(caps);
  //释放caps,这里caps相当于再中间做一些存取的变量
  gst_caps_unref(caps);

//如果找到了对应的拆包对象
  if (depayloader != NULL)
  {
    GST_DEBUG_OBJECT(self, "Found depayloader %" GST_PTR_FORMAT, depayloader);
    kms_base_rtp_endpoint_update_stats(self, depayloader, media);
    gst_bin_add(GST_BIN(self), depayloader);
    gst_element_link_pads(depayloader, "src", agnostic, "sink");
    gst_element_link_pads(rtpbin, GST_OBJECT_NAME(pad), depayloader, "sink");
    gst_element_sync_state_with_parent(depayloader);
  }
  //如果没找到,则直接将数据输出到1个fakesink对象,这个相当于1个黑洞,类似于linux输出流到/dev/null
  else
  {
    GstElement *fake = gst_element_factory_make("fakesink", NULL);

g_object_set(fake, "async", FALSE, "sync", FALSE, NULL);

GST_WARNING_OBJECT(
        self, "Depayloder not found for pad %" GST_PTR_FORMAT, pad);

gst_bin_add(GST_BIN(self), fake);
    gst_element_link_pads(rtpbin, GST_OBJECT_NAME(pad), fake, "sink");
    gst_element_sync_state_with_parent(fake);
  }

end:
  GST_PAD_STREAM_UNLOCK(pad);

//如果添加pad成功,则发送1个MEDIA_START的信号,通知basertpendpoint的子对象,进行业务层面的处理
//给业务层出发1个启动媒体的事件
  if (added)
  {
    g_signal_emit(G_OBJECT(self), obj_signals[MEDIA_START], 0, media, TRUE);
  }
}

//其中关键的1步骤是根据caps获取到depayload的对象,方法如下:

static GstElement *
kms_base_rtp_endpoint_get_depayloader_for_caps(GstCaps *caps)
{
  GstElementFactory *factory;
  GstElement *depayloader = NULL;
  GList *payloader_list, *filtered_list, *l;

payloader_list = gst_element_factory_list_get_elements(
      GST_ELEMENT_FACTORY_TYPE_DEPAYLOADER, GST_RANK_NONE);
  filtered_list = gst_element_factory_list_filter(
      payloader_list, caps, GST_PAD_SINK, FALSE);

if (filtered_list == NULL)
  {
    goto end;
  }

for (l = filtered_list; l != NULL; l = l->next)
  {
    factory = GST_ELEMENT_FACTORY(l->data);

if (factory == NULL)
    {
      continue;
    }

if (g_strcmp0(gst_plugin_feature_get_name(factory), "asteriskh263") == 0)
    {
      /* Do not use asteriskh263 for H263 */
      continue;
    }

depayloader = gst_element_factory_create(factory, NULL);

if (depayloader != NULL)
    {
      kms_utils_depayloader_monitor_pts_out(depayloader);
      break;
    }
  }

end:
  gst_plugin_feature_list_free(filtered_list);
  gst_plugin_feature_list_free(payloader_list);

return depayloader;
}

//如果成功的找到了1个拆包的对象 , 则继续处理这个depayload输出的pts,并生成1个新的pts输出

void
kms_utils_depayloader_monitor_pts_out (GstElement * depayloader)
{
  GstPad *src_pad;

GST_INFO_OBJECT (depayloader, "Add probe: Adjust depayloader PTS out");
  //获取depayloader的src的pad
  src_pad = gst_element_get_static_pad (depayloader, "src");
  //添加1个发现缓存数据的检测时间,检测到数据后对根据服务器的时间对pts进行重新调整
  gst_pad_add_probe (src_pad,
      GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
      //具体调整pts的方法
      (GstPadProbeCallback) kms_utils_depayloader_pts_out_probe,
      kms_utils_adjust_pts_data_new (depayloader),
      (GDestroyNotify) kms_utils_adjust_pts_data_destroy);
  g_object_unref (src_pad);
}

//具体处理拆包后数据的pts的方法

static GstPadProbeReturn
kms_utils_depayloader_pts_out_probe (GstPad * pad, GstPadProbeInfo * info,
    AdjustPtsData * data)
{
  if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER) {
    GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);

buffer = gst_buffer_make_writable (buffer);
    kms_utils_depayloader_adjust_pts_out (data, buffer);
    GST_PAD_PROBE_INFO_DATA (info) = buffer;
  }
  else if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
    GstBufferList *list = GST_PAD_PROBE_INFO_BUFFER_LIST (info);

list = gst_buffer_list_make_writable (list);
    gst_buffer_list_foreach (list,
        (GstBufferListFunc) kms_utils_depayloader_pts_out_it, data);
    GST_PAD_PROBE_INFO_DATA (info) = list;
  }
  return GST_PAD_PROBE_OK;
}

//这里,rtpEndpoint的调用及拆包就完成了。

//接下来,如果我们要增加kurento支持的rtpEndpoint拆包的类型,那就要看看depayloader是如何生成的。我们将在下一章继续分析

Kurento 源码解析系列(4)- RtpEndpoint端点c层代码的调用处理相关推荐

  1. Kurento 源码解析系列(3)- RtpEndpoint端点初始化解析

    关于kurento的rtpendpoint 从客户端发起rpc调用后,在服务器内部如何从cpp层到c层进行相关实例化 需要先回顾以下每个可直接实例化对象的创建的流程  首先是JAVA客户端通过rpc调 ...

  2. Kurento 源码解析系列(1)- PlayerEndpoint的play方法

    PlayerEndpoint.cpp 播放的流程解析 补充一个将源项目各个子项目整合并编译以后的工程地址 https://github.com/ywcai/JKms.git 针对如何编译源代码并在vs ...

  3. Redux 源码解析系列(一) -- Redux的实现思想

    文章来源: IMweb前端社区 黄qiong(imweb.io) IMweb团队正在招聘啦,简历发至jayccchen@tencent.com Redux 其实是用来帮我们管理状态的一个框架,它暴露给 ...

  4. TiKV 源码解析系列文章(二)raft-rs proposal 示例情景分析

    作者:屈鹏 本文为 TiKV 源码解析系列的第二篇,按照计划首先将为大家介绍 TiKV 依赖的周边库 raft-rs .raft-rs 是 Raft 算法的 Rust 语言实现.Raft 是分布式领域 ...

  5. Tomcat源码解析系列二:Tomcat总体架构

    Tomcat即是一个HTTP服务器,也是一个servlet容器,主要目的就是包装servlet,并对请求响应相应的servlet,纯servlet的web应用似乎很好理解Tomcat是如何装载serv ...

  6. prometheus变量_TiKV 源码解析系列文章(四)Prometheus(下)

    本文为 TiKV 源码解析系列的第四篇,接上篇继续为大家介绍 rust-prometheus.上篇主要介绍了基础知识以及最基本的几个指标的内部工作机制,本篇会进一步介绍更多高级功能的实现原理. 与上篇 ...

  7. TiKV 源码解析系列 - Raft 的优化

    这篇文章转载TiDB大牛 唐刘 的博客:https://mp.weixin.qq.com/s?__biz=MzI3NDIxNTQyOQ==&mid=2247484544&idx=1&a ...

  8. Netty 源码解析系列-服务端启动流程解析

    netty源码解析系列 Netty 源码解析系列-服务端启动流程解析 Netty 源码解析系列-客户端连接接入及读I/O解析 五分钟就能看懂pipeline模型 -Netty 源码解析 1.服务端启动 ...

  9. Mybatis3 源码解析系列

    简介 Mybatis作为一个优秀的Java持久化框架,在我们的日常工作中相信都会用到,本次源码解析系列,就开始探索下Mybatis 总结 在MyBatis的学习中,首先通读了<MyBatis3源 ...

最新文章

  1. android7.1增加一个开机自启动的bin应用遇到的权限问题
  2. 电脑模拟器哪个好_《英雄聯盟:激鬥峽谷》电脑版哪个安卓模拟器好用?《英雄聯盟:激鬥峽谷》手游电脑版怎么玩...
  3. VC维与DNN的Boundary
  4. dateframe行列插入和删除操作
  5. C++入门指南及实战 第一步 概述及经典HelloWorld
  6. 简单的测试可以防止最严重的故障
  7. 嘉益仕(Litns)带您读懂MES系统:选型篇
  8. Hibernate多表查询重新封装实体
  9. Linux 启动流程即init程序分析--1
  10. Python3利用BeautifulSoup4抓取站点小说全文的代码
  11. Android背景图片设置
  12. 学生成绩管理系统mysql课程设计_数据库课程设计报告-学生成绩管理系统
  13. css ul1,CSS 列表样式 ul
  14. c语言定义浮点变量i和j,2012年计算机等级考试二级C语言基础教程:数据类型、变量和运算符...
  15. A. Arena of Greed【贪心+特判】
  16. BP神经网络以及python实现
  17. Python 读写IC卡、复制IC卡
  18. 校招生向京东发起的“攻势”,做到他这样,你,也可以
  19. 视频教程-微信小程序商城-界面设计实战教学(含源代码)-微信开发
  20. 【渝粤题库】广东开放大学 国际标准化 形成性考核 (2)

热门文章

  1. 一文读读懂SVM推导全过程
  2. JavaScript的window.onload事件的理解
  3. shuffle算法c语言,C#Shuffle算法(洗牌算法、抽样算法)
  4. Kali渗透-NMAP高级使用技巧和漏洞扫描发现
  5. 如何快速学习abaqus复合材料--【理论+实操】
  6. upload video
  7. 首届“敏捷中国”开发者大会明日召开
  8. 2021四川书法高考成绩查询,2021年四川书法水平测试成绩公布时间及评分标准
  9. SourceTree的安装和使用教程
  10. matlab生成动态函数图像