网络模块介绍

webrtc 中音视频数据是通过udp传输,所以srs4.0要支持webrtc得开启一个udp server进行收发流。
具体代码见文件:srs_app_rtc_server.cpp
监听逻辑

srs_error_t SrsRtcServer::listen_udp()
{//首先获得监听的IP、PORTint port = _srs_config->get_rtc_server_listen();string ip = srs_any_address_for_listener();srs_assert(listeners.empty());//基于配置文件中的reuseport值,决定在同一个端口进行监听的次数。//默认创建一个socket监听一次。如果reuseport大于一就会创建多个socket监听多次// 每个SrsUdpMuxListener内部会启动一个协程进行网络收发int nn_listeners = _srs_config->get_rtc_server_reuseport();for (int i = 0; i < nn_listeners; i++) { //把该类的this指针传进去,接收到网络数据后回传到SrsRtcServerSrsUdpMuxListener* listener = new SrsUdpMuxListener(this, ip, port);if ((err = listener->listen()) != srs_success) {srs_freep(listener);return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port);}listeners.push_back(listener);}return err;
}

SrsUdpMuxListener 网络处理类

启动监听

srs_error_t SrsUdpMuxListener::listen()
{srs_error_t err = srs_success;if ((err = srs_udp_listen(ip, port, &lfd)) != srs_success) {return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port);}//启动一个协程接收网络数据srs_freep(trd);trd = new SrsSTCoroutine("udp", this, cid);if ((err = trd->start()) != srs_success) {return srs_error_wrap(err, "start thread");}return err;
}

修改udp 接收、发送buffer 大小,发送接收buffer 都设置为10M

void SrsUdpMuxListener::set_socket_buffer()
{int default_sndbuf = 0;// TODO: FIXME: Config it.int expect_sndbuf = 1024*1024*10; // 10Mint actual_sndbuf = expect_sndbuf;int r0_sndbuf = 0;if (true) {if ((r0_sndbuf = setsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&actual_sndbuf, sizeof(actual_sndbuf))) < 0) {srs_warn("set SO_SNDBUF failed, expect=%d, r0=%d", expect_sndbuf, r0_sndbuf);}}int default_rcvbuf = 0;// TODO: FIXME: Config it.int expect_rcvbuf = 1024*1024*10; // 10Mint actual_rcvbuf = expect_rcvbuf;int r0_rcvbuf = 0;if (true) {if ((r0_rcvbuf = setsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&actual_rcvbuf, sizeof(actual_rcvbuf))) < 0) {srs_warn("set SO_RCVBUF failed, expect=%d, r0=%d", expect_rcvbuf, r0_rcvbuf);}}
}

接收网络数据

srs_error_t SrsUdpMuxListener::cycle()
{while (true) {if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "udp listener");}nn_loop++;int nread = skt.recvfrom(SRS_UTIME_NO_TIMEOUT);if (nread <= 0) {if (nread < 0) {srs_warn("udp recv error nn=%d", nread);}// remux udp never returncontinue;}nn_msgs++;nn_msgs_stage++;// Handle the UDP packet.//这里handler就是初始化SrsUdpMuxListener是传进来的SrsRtcServer//所以收到的网络数据最终传到SrsRtcServer的on_udp_packet()函数err = handler->on_udp_packet(&skt);}
}

处理UDP 数据包

有四种类型的UDP 包:stun 、dtls、srtp、srtcp,基于包头的类型分别处理这四种包
如:判断是否是rtcp

// For RTCP, PT is [128, 223] (or without marker [0, 95]).
// Literally, RTCP starts from 64 not 0, so PT is [192, 223] (or without marker [64, 95]).
// @note For RTP, the PT is [96, 127], or [224, 255] with marker.
bool srs_is_rtcp(const uint8_t* data, size_t len)
{return (len >= 12) && (data[0] & 0x80) && (data[1] >= 192 && data[1] <= 223);
}

处理收到的udp包,根据包类型分别处理上面提到的四种包

srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt)
{SrsRtcConnection* session = NULL;uint64_t fast_id = skt->fast_id();// Try fast id first, if not found, search by long peer id.//基于用户的IP 、PORT信息找到该用户if (fast_id) {session = (SrsRtcConnection*)_srs_rtc_manager->find_by_fast_id(fast_id);}if (!session) {string peer_id = skt->peer_id();session = (SrsRtcConnection*)_srs_rtc_manager->find_by_id(peer_id);}// For STUN, the peer address may change.处理stun包if (!is_rtp_or_rtcp && srs_is_stun((uint8_t*)data, size)) {SrsStunPacket ping;if ((err = ping.decode(data, size)) != srs_success) {return srs_error_wrap(err, "decode stun packet failed");}return session->on_stun(skt, &ping);}// Note that we don't(except error) switch to the context of session, for performance issue.//处理rtp包if (is_rtp_or_rtcp && !is_rtcp) {++_srs_pps_rrtps->sugar;err = session->on_rtp(data, size);if (err != srs_success) {session->switch_to_context();}return err;}session->switch_to_context();//处理rtcp包if (is_rtp_or_rtcp && is_rtcp) {++_srs_pps_rrtcps->sugar;return session->on_rtcp(data, size);}//处理dtls包if (srs_is_dtls((uint8_t*)data, size)) {++_srs_pps_rstuns->sugar;return session->on_dtls(data, size);}
}

处理rtp

首先确定该包是谁发的,即找到该包的发送者

srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data)
{srs_error_t err = srs_success;SrsRtcPublishStream* publisher = NULL;if ((err = find_publisher(data, nb_data, &publisher)) != srs_success) {return srs_error_wrap(err, "find");}srs_assert(publisher);return publisher->on_rtp(data, nb_data);
}

基于包中的ssrc找到发布者

srs_error_t SrsRtcConnection::find_publisher(char* buf, int size, SrsRtcPublishStream** ppublisher)
{srs_error_t err = srs_success;if (publishers_.size() == 0) {return srs_error_new(ERROR_RTC_RTCP, "no publisher");}//从包中解析出ssrcuint32_t ssrc = srs_rtp_fast_parse_ssrc(buf, size);if (ssrc == 0) {return srs_error_new(ERROR_RTC_NO_PUBLISHER, "invalid ssrc");}//基于ssrc找到该包的发布者map<uint32_t, SrsRtcPublishStream*>::iterator it = publishers_ssrc_map_.find(ssrc);if(it == publishers_ssrc_map_.end()) {return srs_error_new(ERROR_RTC_NO_PUBLISHER, "no publisher for ssrc:%u", ssrc);}*ppublisher = it->second;return err;
}

发布者处理rtp 包,主要进行解密,然后基于ssrc分别把数据传给videotrack 、audioTrack进行处理

srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data)
{//解密 srtp ,得到 rtpif ((err = session_->transport_->unprotect_rtp(plaintext, &nb_plaintext)) != srs_success) {}//处理rtpif ((err = on_rtp_plaintext(plaintext, nb_plaintext)) != srs_success) { err = do_on_rtp_plaintext(pkt, &buf);}
}

do_on_rtp_plaintext()函数把数据传给SrsRtcVideoRecvTrack、SrsRtcAudioRecvTrack。
Track中的source是 SrsRtcStream* source;

 srs_error_t SrsRtcPublishStream::do_on_rtp_plaintext(SrsRtpPacket2*& pkt, SrsBuffer* buf){// For source to consume packet.uint32_t ssrc = pkt->header.get_ssrc();SrsRtcAudioRecvTrack* audio_track = get_audio_track(ssrc);SrsRtcVideoRecvTrack* video_track = get_video_track(ssrc);if (audio_track) {pkt->frame_type = SrsFrameTypeAudio;if ((err = audio_track->on_rtp(source, pkt)) != srs_success) {return srs_error_wrap(err, "on audio");}} else if (video_track) {pkt->frame_type = SrsFrameTypeVideo;if ((err = video_track->on_rtp(source, pkt)) != srs_success) {return srs_error_wrap(err, "on video");}}}

把流转发给该发布者的所有消费者(订阅者), 流没有立即传发给消费者的网络模块,而是进入每个消费者的队列,消费者会启动一个新的协程 从队列中读取数据,再发到网络层。

srs_error_t SrsRtcStream::on_rtp(SrsRtpPacket2* pkt)
{srs_error_t err = srs_success;for (int i = 0; i < (int)consumers.size(); i++) {SrsRtcConsumer* consumer = consumers.at(i);if ((err = consumer->enqueue(pkt->copy())) != srs_success) {return srs_error_wrap(err, "consume message");}}return err;
}

下篇文章再分析消费者逻辑

如果有帮助欢迎关注我哈

srs4.0 webrtc分析(五)网络模块相关推荐

  1. srs4.0 webrtc分析(四)推流、播放类逻辑分析

    介绍 分析srs4.0 webrtc 模块 ,推流端对应SrsRtcPublishStream.播放端对应SrsRtcPlayStream 类,本文将简单介绍这两个类. 推流类 SrsRtcPubli ...

  2. 10、SRS4.0源代码分析之WebRTC推流端处理

    目标: 上一节分析了SRS4.0中WebRTC模块的总体架构和软件处理流程.接下来分析SRS4.0 WebRTC模块针对客户端推流连接上各种协议报文的软件处理逻辑. 内容: WebRTC模块在启动过程 ...

  3. SRS4.0源代码分析之RTMP拉流处理

    目标: 上一节分析了SRS针对推流客户端的处理逻辑,这里接下来分析针对拉流客户端的处理逻辑. SRS拉流端处理逻辑简单说就是SrsRtmpConn::do_playing()协程从SrsLiveCon ...

  4. 5、SRS4.0源代码分析之RTMP拉流处理

    目标: 上一节分析了SRS针对推流客户端的处理逻辑,这里接下来分析针对拉流客户端的处理逻辑. SRS拉流端处理逻辑简单说就是SrsRtmpConn::do_playing()协程从SrsLiveCon ...

  5. 4、SRS4.0源代码分析之RTMP推流处理

    目标:     本章我们将分析SRS4.0 RTMP服务模块与推流相关的代码处理逻辑. 内容:     根据上节内容可知,SRS4.0针对RTMP推流客户端的处理逻辑,主要在协程SrsRtmpConn ...

  6. 13、SRS4.0源代码分析之GB28181实验环境搭建

    前言 严格的说SRS4.0正式发布版本中已经去掉了GB28181相关的代码(主要时因为该特性还有一些Bug需要修复),本文目的是记录之前学习和使用SRS GB28181推流处理的一些心得. 内容 一. ...

  7. SRS4.0源码分析-序言

    <SRS4.0源码分析>专栏,会从 configure(配置),makefile(编译规则),main (入口函数), 带你一步一步了解 SRS 的主干代码逻辑. 这里分享一个本人阅读开源 ...

  8. SRS4.0源码分析-RTMP入口

    本文采用的 SRS 版本是 4.0-b8 , 下载地址:github 上篇文章 <SRS4.0源码分析-main> 讲解了 SRS main 函数的基本流程,但是可能有些朋友还是比较懵逼. ...

  9. SRS4.0源码分析-CMake

    本文采用的 SRS 版本是 4.0-b8 , 下载地址:github <SRS4.0源码分析-调试环境搭建> 讲了 SRS 在 Clion 里面的调试,本文主要讲解 srs-4.0-b8\ ...

最新文章

  1. 如何开始SLAM学习?
  2. linux eclipse go插件,Eclipse的Go插件(goclipse)
  3. RTP/RTSP/RTCP 协议详解
  4. ihtml2document能不能根据id获取dom_一段监视 DOM 的神奇代码
  5. MSDN Visual系列:在MOSS中创建一个BDC实体
  6. [Ext JS6实战] Ajax获取Tree Store
  7. 如何在virtualenv环境中安装指定的python版本
  8. sqoop连接Oracle数据库错误异常
  9. js constructor 和 instanceof
  10. spark操作读取hbase实例
  11. mysql挂科了咋办_大学第一学期挂科怎么办?
  12. ubuntu 版mysql客户端工具_ubuntu安装mysql可视化工具MySQL-workbench及简单操作
  13. 50家大厂面试万字精华总结,高性能mysql第五版pdf
  14. xmind思维导图模板_思维导图模板
  15. 主控芯片测试软件,主控芯片检测工具MyDiskTest的使用教程的详解【图文】
  16. GAMIT处理GLONASS数据
  17. 【ArcGIS自定义脚本工具】NDVI批量估算植被覆盖率
  18. 【SQL】小CASE
  19. 支持非对称命名空间访问的SPDK多路径验证
  20. 阿里云远程桌面连接不到的问题

热门文章

  1. linux(centos)基于docker搭建的oracle服务器,并上传到私有仓库
  2. 一个中年“码农”的困局
  3. sc计算机技术,计算机学院师生首次在超级计算旗舰会议SC上发表学术论文
  4. 2,GuardedMain(大象无形9.2)
  5. Cocos Creator 2.1.3 正式发布
  6. 程序员在工作时间应该如何着装
  7. 软件生存周期的过程,活动和任务
  8. 名义小组和焦点小组的关系
  9. HP-UX静默安装oracle11g过程
  10. generator代码生成器