srs4.0 webrtc分析(五)网络模块
网络模块介绍
webrtc 中音视频数据是通过udp传输,所以srs4.0要支持webrtc得开启一个udp server进行收发流。
具体代码见文件:srs_app_rtc_server.cpp
监听逻辑
srs_error_t SrsRtcServer::listen_udp()
{//首先获得监听的IP、PORTint port = _srs_config->get_rtc_server_listen();string ip = srs_any_address_for_listener();srs_assert(listeners.empty());//基于配置文件中的reuseport值,决定在同一个端口进行监听的次数。//默认创建一个socket监听一次。如果reuseport大于一就会创建多个socket监听多次// 每个SrsUdpMuxListener内部会启动一个协程进行网络收发int nn_listeners = _srs_config->get_rtc_server_reuseport();for (int i = 0; i < nn_listeners; i++) { //把该类的this指针传进去,接收到网络数据后回传到SrsRtcServerSrsUdpMuxListener* listener = new SrsUdpMuxListener(this, ip, port);if ((err = listener->listen()) != srs_success) {srs_freep(listener);return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port);}listeners.push_back(listener);}return err;
}
SrsUdpMuxListener 网络处理类
启动监听
srs_error_t SrsUdpMuxListener::listen()
{srs_error_t err = srs_success;if ((err = srs_udp_listen(ip, port, &lfd)) != srs_success) {return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port);}//启动一个协程接收网络数据srs_freep(trd);trd = new SrsSTCoroutine("udp", this, cid);if ((err = trd->start()) != srs_success) {return srs_error_wrap(err, "start thread");}return err;
}
修改udp 接收、发送buffer 大小,发送接收buffer 都设置为10M
void SrsUdpMuxListener::set_socket_buffer()
{int default_sndbuf = 0;// TODO: FIXME: Config it.int expect_sndbuf = 1024*1024*10; // 10Mint actual_sndbuf = expect_sndbuf;int r0_sndbuf = 0;if (true) {if ((r0_sndbuf = setsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&actual_sndbuf, sizeof(actual_sndbuf))) < 0) {srs_warn("set SO_SNDBUF failed, expect=%d, r0=%d", expect_sndbuf, r0_sndbuf);}}int default_rcvbuf = 0;// TODO: FIXME: Config it.int expect_rcvbuf = 1024*1024*10; // 10Mint actual_rcvbuf = expect_rcvbuf;int r0_rcvbuf = 0;if (true) {if ((r0_rcvbuf = setsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&actual_rcvbuf, sizeof(actual_rcvbuf))) < 0) {srs_warn("set SO_RCVBUF failed, expect=%d, r0=%d", expect_rcvbuf, r0_rcvbuf);}}
}
接收网络数据
srs_error_t SrsUdpMuxListener::cycle()
{while (true) {if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "udp listener");}nn_loop++;int nread = skt.recvfrom(SRS_UTIME_NO_TIMEOUT);if (nread <= 0) {if (nread < 0) {srs_warn("udp recv error nn=%d", nread);}// remux udp never returncontinue;}nn_msgs++;nn_msgs_stage++;// Handle the UDP packet.//这里handler就是初始化SrsUdpMuxListener是传进来的SrsRtcServer//所以收到的网络数据最终传到SrsRtcServer的on_udp_packet()函数err = handler->on_udp_packet(&skt);}
}
处理UDP 数据包
有四种类型的UDP 包:stun 、dtls、srtp、srtcp,基于包头的类型分别处理这四种包
如:判断是否是rtcp
// For RTCP, PT is [128, 223] (or without marker [0, 95]).
// Literally, RTCP starts from 64 not 0, so PT is [192, 223] (or without marker [64, 95]).
// @note For RTP, the PT is [96, 127], or [224, 255] with marker.
bool srs_is_rtcp(const uint8_t* data, size_t len)
{return (len >= 12) && (data[0] & 0x80) && (data[1] >= 192 && data[1] <= 223);
}
处理收到的udp包,根据包类型分别处理上面提到的四种包
srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt)
{SrsRtcConnection* session = NULL;uint64_t fast_id = skt->fast_id();// Try fast id first, if not found, search by long peer id.//基于用户的IP 、PORT信息找到该用户if (fast_id) {session = (SrsRtcConnection*)_srs_rtc_manager->find_by_fast_id(fast_id);}if (!session) {string peer_id = skt->peer_id();session = (SrsRtcConnection*)_srs_rtc_manager->find_by_id(peer_id);}// For STUN, the peer address may change.处理stun包if (!is_rtp_or_rtcp && srs_is_stun((uint8_t*)data, size)) {SrsStunPacket ping;if ((err = ping.decode(data, size)) != srs_success) {return srs_error_wrap(err, "decode stun packet failed");}return session->on_stun(skt, &ping);}// Note that we don't(except error) switch to the context of session, for performance issue.//处理rtp包if (is_rtp_or_rtcp && !is_rtcp) {++_srs_pps_rrtps->sugar;err = session->on_rtp(data, size);if (err != srs_success) {session->switch_to_context();}return err;}session->switch_to_context();//处理rtcp包if (is_rtp_or_rtcp && is_rtcp) {++_srs_pps_rrtcps->sugar;return session->on_rtcp(data, size);}//处理dtls包if (srs_is_dtls((uint8_t*)data, size)) {++_srs_pps_rstuns->sugar;return session->on_dtls(data, size);}
}
处理rtp
首先确定该包是谁发的,即找到该包的发送者
srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data)
{srs_error_t err = srs_success;SrsRtcPublishStream* publisher = NULL;if ((err = find_publisher(data, nb_data, &publisher)) != srs_success) {return srs_error_wrap(err, "find");}srs_assert(publisher);return publisher->on_rtp(data, nb_data);
}
基于包中的ssrc找到发布者
srs_error_t SrsRtcConnection::find_publisher(char* buf, int size, SrsRtcPublishStream** ppublisher)
{srs_error_t err = srs_success;if (publishers_.size() == 0) {return srs_error_new(ERROR_RTC_RTCP, "no publisher");}//从包中解析出ssrcuint32_t ssrc = srs_rtp_fast_parse_ssrc(buf, size);if (ssrc == 0) {return srs_error_new(ERROR_RTC_NO_PUBLISHER, "invalid ssrc");}//基于ssrc找到该包的发布者map<uint32_t, SrsRtcPublishStream*>::iterator it = publishers_ssrc_map_.find(ssrc);if(it == publishers_ssrc_map_.end()) {return srs_error_new(ERROR_RTC_NO_PUBLISHER, "no publisher for ssrc:%u", ssrc);}*ppublisher = it->second;return err;
}
发布者处理rtp 包,主要进行解密,然后基于ssrc分别把数据传给videotrack 、audioTrack进行处理
srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data)
{//解密 srtp ,得到 rtpif ((err = session_->transport_->unprotect_rtp(plaintext, &nb_plaintext)) != srs_success) {}//处理rtpif ((err = on_rtp_plaintext(plaintext, nb_plaintext)) != srs_success) { err = do_on_rtp_plaintext(pkt, &buf);}
}
do_on_rtp_plaintext()函数把数据传给SrsRtcVideoRecvTrack、SrsRtcAudioRecvTrack。
Track中的source是 SrsRtcStream* source;
srs_error_t SrsRtcPublishStream::do_on_rtp_plaintext(SrsRtpPacket2*& pkt, SrsBuffer* buf){// For source to consume packet.uint32_t ssrc = pkt->header.get_ssrc();SrsRtcAudioRecvTrack* audio_track = get_audio_track(ssrc);SrsRtcVideoRecvTrack* video_track = get_video_track(ssrc);if (audio_track) {pkt->frame_type = SrsFrameTypeAudio;if ((err = audio_track->on_rtp(source, pkt)) != srs_success) {return srs_error_wrap(err, "on audio");}} else if (video_track) {pkt->frame_type = SrsFrameTypeVideo;if ((err = video_track->on_rtp(source, pkt)) != srs_success) {return srs_error_wrap(err, "on video");}}}
把流转发给该发布者的所有消费者(订阅者), 流没有立即传发给消费者的网络模块,而是进入每个消费者的队列,消费者会启动一个新的协程 从队列中读取数据,再发到网络层。
srs_error_t SrsRtcStream::on_rtp(SrsRtpPacket2* pkt)
{srs_error_t err = srs_success;for (int i = 0; i < (int)consumers.size(); i++) {SrsRtcConsumer* consumer = consumers.at(i);if ((err = consumer->enqueue(pkt->copy())) != srs_success) {return srs_error_wrap(err, "consume message");}}return err;
}
下篇文章再分析消费者逻辑
如果有帮助欢迎关注我哈
srs4.0 webrtc分析(五)网络模块相关推荐
- srs4.0 webrtc分析(四)推流、播放类逻辑分析
介绍 分析srs4.0 webrtc 模块 ,推流端对应SrsRtcPublishStream.播放端对应SrsRtcPlayStream 类,本文将简单介绍这两个类. 推流类 SrsRtcPubli ...
- 10、SRS4.0源代码分析之WebRTC推流端处理
目标: 上一节分析了SRS4.0中WebRTC模块的总体架构和软件处理流程.接下来分析SRS4.0 WebRTC模块针对客户端推流连接上各种协议报文的软件处理逻辑. 内容: WebRTC模块在启动过程 ...
- SRS4.0源代码分析之RTMP拉流处理
目标: 上一节分析了SRS针对推流客户端的处理逻辑,这里接下来分析针对拉流客户端的处理逻辑. SRS拉流端处理逻辑简单说就是SrsRtmpConn::do_playing()协程从SrsLiveCon ...
- 5、SRS4.0源代码分析之RTMP拉流处理
目标: 上一节分析了SRS针对推流客户端的处理逻辑,这里接下来分析针对拉流客户端的处理逻辑. SRS拉流端处理逻辑简单说就是SrsRtmpConn::do_playing()协程从SrsLiveCon ...
- 4、SRS4.0源代码分析之RTMP推流处理
目标: 本章我们将分析SRS4.0 RTMP服务模块与推流相关的代码处理逻辑. 内容: 根据上节内容可知,SRS4.0针对RTMP推流客户端的处理逻辑,主要在协程SrsRtmpConn ...
- 13、SRS4.0源代码分析之GB28181实验环境搭建
前言 严格的说SRS4.0正式发布版本中已经去掉了GB28181相关的代码(主要时因为该特性还有一些Bug需要修复),本文目的是记录之前学习和使用SRS GB28181推流处理的一些心得. 内容 一. ...
- SRS4.0源码分析-序言
<SRS4.0源码分析>专栏,会从 configure(配置),makefile(编译规则),main (入口函数), 带你一步一步了解 SRS 的主干代码逻辑. 这里分享一个本人阅读开源 ...
- SRS4.0源码分析-RTMP入口
本文采用的 SRS 版本是 4.0-b8 , 下载地址:github 上篇文章 <SRS4.0源码分析-main> 讲解了 SRS main 函数的基本流程,但是可能有些朋友还是比较懵逼. ...
- SRS4.0源码分析-CMake
本文采用的 SRS 版本是 4.0-b8 , 下载地址:github <SRS4.0源码分析-调试环境搭建> 讲了 SRS 在 Clion 里面的调试,本文主要讲解 srs-4.0-b8\ ...
最新文章
- 如何开始SLAM学习?
- linux eclipse go插件,Eclipse的Go插件(goclipse)
- RTP/RTSP/RTCP 协议详解
- ihtml2document能不能根据id获取dom_一段监视 DOM 的神奇代码
- MSDN Visual系列:在MOSS中创建一个BDC实体
- [Ext JS6实战] Ajax获取Tree Store
- 如何在virtualenv环境中安装指定的python版本
- sqoop连接Oracle数据库错误异常
- js constructor 和 instanceof
- spark操作读取hbase实例
- mysql挂科了咋办_大学第一学期挂科怎么办?
- ubuntu 版mysql客户端工具_ubuntu安装mysql可视化工具MySQL-workbench及简单操作
- 50家大厂面试万字精华总结,高性能mysql第五版pdf
- xmind思维导图模板_思维导图模板
- 主控芯片测试软件,主控芯片检测工具MyDiskTest的使用教程的详解【图文】
- GAMIT处理GLONASS数据
- 【ArcGIS自定义脚本工具】NDVI批量估算植被覆盖率
- 【SQL】小CASE
- 支持非对称命名空间访问的SPDK多路径验证
- 阿里云远程桌面连接不到的问题