licode源码分析-媒体数据的处理流程
本文主要分析licode C++部分对视频流的处理流程。主要介绍licode从发送客户端接收视频流,然后经过内部的处理,再将视频流发送到接收客户端。
licode虽然是MCU模型,但提供的主要功能还是SFU,它不能将同一个房间内的音视频进行混合,仅提供了单路视频流的转码功能。
现在先假设licode和一个发送客户端、一个接收客户端相连并转发视频流。
这是licode从网络接收发送客户端视频数据,经过内部流转,再通过网络发送至接收客户端的总览图。
使用nICEr库接收网络数据
licode使用nICEr库与客户端进行ICE交换,最终由nICEr库与客户端建立网络连接,并负责和客户端进行数据收发。
NicerInterfaceImpl类是对nICEr库的封装,NicerConnection类是对ICE的封装,用于连接的建立和数据的收发,它们的关系如下图:
NicerConnection和NicerInterpfaceImpl类对象,主要运行在IOWorker线程内,这个线程主要用于建立连接、数据的收发。
NicerConnection对象主要运行在IOWorker线程,当Worker线程调用NicerConnection成员函数时,若在函数中直接访问数据成员,则有可能造成数据竞争。在licode实现中,通过异步的任务队列,将线程不安全的成员函数,封装成了线程安全的函数。具体方式是,在有可能发生数据竞争的函数中,会封装一个异步任务丢到IOWorker队列,由IOWorker线程去处理,避免了Worker线程直接访问NicerConnection数据成员,从而避免了数据竞争。这样仅在Worker向IOWorker任务队列丢任务时需要上锁,这将大大减小临界区,从而提高了并发。
void NicerConnection::startSync()
{...ice_handler_vtbl_->msg_recvd = &NicerConnection::msg_recvd;...
}
在licode与客户端建立连接的过程中,在NicerConnection::startSync()函数中会把其静态函数NicerConnection::msg_recvd()注册到nICEr库中,当nICEr库收到客户端发送过来的数据时,就会回调这个函数,将数据包传送上来。
int NicerConnection::msg_recvd(void *obj, nr_ice_peer_ctx *pctx, nr_ice_media_stream *stream, int component_id,unsigned char *msg, int len)
{NicerConnection *conn = reinterpret_cast<NicerConnection*>(obj);conn->onData(component_id, reinterpret_cast<char*> (msg), static_cast<unsigned int> (len));return 0;
}
这个函数是静态函数,在被调用时,obj参数中保存的是具体的NicerConnection对象。通过conn对象将nICEr库收到数据包送至conn对象的onData()函数中。
void NicerConnection::onData(unsigned int component_id, char* buf, int len)
{IceState state;/*获取ICE的状态*/{boost::mutex::scoped_lock lock(close_mutex_);state = this->checkIceState();}/*如果处于ready状态,则可以继续向下传送。*/if (state == IceState::READY) {/*以下代码是将收到的数据包封装成DataPacket对象*/packetPtr packet (new DataPacket());memcpy(packet->data, buf, len);/*用于指示是rtp还是rtcp。*/packet->comp = component_id; packet->length = len;/*记录packet的接收时间*/packet->received_time_ms = ClockUtils::timePointToMs(clock::now());if (auto listener = getIceListener().lock()) {/*将packet分发至其订阅者DtlsTransport*/listener->onPacketReceived(packet); }}
}void DtlsTransport::start()
{ice_->setIceListener(shared_from_this()); /*设置IceConnection的订阅者*/...
}
如果ICE处于READY状态,说明可以将数据包往下流转。先是将收到的数据封装成DataPacket对象,以方便之后的处理。接着将数据包发送给它的订阅者DtlsTransport。
在NicerConnection在DtlsTransport::start()函数中,通过setIceListener()设置了它的订阅者DtlsTransport。
void Transport::onPacketReceived(packetPtr packet)
{std::weak_ptr<Transport> weak_transport = Transport::shared_from_this();/*封装为一个异步的任务丢到Worker线程的任务队列上*/worker_->task([weak_transport, packet]() {if (auto this_ptr = weak_transport.lock()){if (packet->length > 0) {/*将收到的packet,传送至DtlsTransport。*/this_ptr->onIceData(packet); }if (packet->length == -1){this_ptr->running_ = false;return;}}});}
NicerConnection先将数据包发送到了DtlsTransport父类的函数Transport::onPacketReceived()函数中。
因为子类覆写了虚函数onIceData()函数,所以this_ptr->onIceData(packet)调用的是DtlsTransport类内的onIceData()函数。
Transport::onPacketReceived()函数的调用,还是在IOWorker线程中,现在IOWorker线程会将收到的数据包封装到一个异步的任务中,让后将这个任务丢到WebRtcConnection所在的Worker线程中,接着由Worker线程处理收到的数据包。
[weak_transport, packet]() {if (auto this_ptr = weak_transport.lock()){if (packet->length > 0) {/*将收到的packet,传送至DtlsTransport。*/this_ptr->onIceData(packet); }if (packet->length == -1){this_ptr->running_ = false;return;}}}
这是在Transport::onPacketReceived()函数中封装的lambda对象,这个lambda对象会在WebRtcConnection所在的Worker线程中执行。
现在发送客户端的数据包已经由IOWorker线程通过nICEr库从网络接收。先是封装成DataPacket对象,
再进一步封装为一个异步的任务丢到了Woker线程的任务队列,交由Worker线程处理。Woker线程在执行上面的lambda时,会将数据包送至传送至DtlsTransport内。
上面的整个过程如下图所示:
DtlsTransport对输入数据的处理
现在经过IOWorker线程从网络接收数据包,经过流转,最终将数据包封装成一个任务丢到了Woker线程的任务队列,Worker线程执行到IOWorker线程丢过来的任务时,会通过调用了DtlsTransport::onIceData()函数,将数据包传递至DtlsTransport类内。
现在DtlsTransport对象会在Worker线程内对数据包进行处理。
void DtlsTransport::onIceData(packetPtr packet)
{.../*根据数据类型的不同,分类处理。*/if (DtlsTransport::isDtlsPacket(data, len)) { /*DTLS握手时的数据包,交由DtlsSocketContext处理。*/...} else if (this->getTransportState() == TRANSPORT_READY){/*RTP和RTCP数据包*//*如果使用了srtp,则需要使用SrtpChannel类进行解密。*/if (srtp != NULL) {...}if (auto listener = getTransportListener().lock()) {/*将解密后的数据,分发至其订阅者WebRtcConnection。*/listener->onTransportData(unprotect_packet, this); }}
}
DtlsTransport会根据数据包的类型,进行分类处理。若是DTLS握手报文,则交由DtlsSocketContext处理,否则就是RTP/RTCP数据包。
若开启了srtp,需要对接收的数据送至SrtpChannel进行解密。最后将解密后的数据包,分发给订阅者WebRtcConnection。
上面流程如下图:
流水线-pipeline
在介绍WebRtcConnection对数据包处理之前,先介绍一下licode中的流水线。流水线处理是licode中最具特色的地方,非常便于对数据包处理的扩展。
Pipeline的实现还是有些复杂的,此处并不介绍Pipeline是怎么实现,只讲一讲如何使用流水线,流水线是如何运行起来的,以及数据包是如何在流水线内部流转的。
Pipeline类图
pipeline的handle
handle负责对媒体数据的具体处理,分为三种类型:IN、OUT、BOTH。
在licode中,每个与客户端连接的WebRtcConnection内,都会有两条双向的流水线。一条在WebRtcConnection类内,另一条在MediaStream类内。
流水线是双向的,在WebRtcConnection类内的流水线,IN模块只用于处理DTLSTransport类发送过来的数据,也就是收到的网络数据。OUT模块只用于MediaStream发送过来的数据,数据经过OUT模块处理后发送给DTLSTransport。BOTH模块会处理这两个方向上的数据。
IN类型
IN类型的模块,需要继承自InboundHandler类,其中最重要的是read()。流水线的上一个模块在处理完数据包后,会调用本模块的read()函数,将数据包交由本模块。所以这个函数用于接收上个处理模块发送过来的数据包。当在本模块中处理完毕后,需要调用fireRead()函数将数据包传给下一个模块。
void PacketCodecParser::read(Context *ctx, std::shared_ptr<DataPacket> packet)
{RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(packet->data);if (!chead->isRtcp() && enabled_) {...}/*传给下一个模块*/ctx->fireRead(std::move(packet));
}
PacketCodecParser是一个IN类型的模块,上一个模块处理完数据后,会将数据包丢到这里的read()函数,在这个函数中处理完毕后,会通过ctx->fireRead(std::move(packet))将数据包丢给下一个处理模块。
OUT类型
OUT类型的模块,需要继承自OutboundHandler类。write()函数等同于IN类中的read()函数,用于接收上一个模块发送过来的数据包。
void FecReceiverHandler::write(Context *ctx, std::shared_ptr<DataPacket> packet) {if (enabled_ && packet->type == VIDEO_PACKET) {...}/*传给下一个模块*/ctx->fireWrite(std::move(packet));
}
FecReceiverHandler是一个OUT类型的模块,在write()函数中接收上一个模块传入的数据包,处理结束后,通过ctx->fireWrite(std::move(packet))传递给下一个模块。
BOTH类型
BOTH类型的模块,需要继承自Handler类。在WebRtcConnection流水线中,read()函数用于接收从网络方向来的数据包,write()用于接收向网络方向发送的数据包。
void RtcpProcessorHandler::read(Context *ctx, std::shared_ptr<DataPacket> packet) {RtcpHeader *chead = reinterpret_cast<RtcpHeader*> (packet->data);if (chead->isRtcp()) {...}processor_->checkRtcpFb();/*丢给下一个模块*/ctx->fireRead(std::move(packet));
}void RtcpProcessorHandler::write(Context *ctx, std::shared_ptr<DataPacket> packet) {RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(packet->data);if (chead->isFeedback()) {...}/*丢给下一个模块*/ctx->fireWrite(std::move(packet));
}
RtcpProcessorHandler是一个BOTH类型的模块,相当于同时具有IN类型和OUT类型。read()函数同IN类型,write()函数同OUT类型。
WebRtcConnection中流水线的建立
void WebRtcConnection::initializePipeline()
{.../*向流水线中添加处理模块*/pipeline_->addFront(std::make_shared<ConnectionPacketReader>(this)); pipeline_->addFront(std::make_shared<SenderBandwidthEstimationHandler>()); pipeline_->addFront(std::make_shared<RtpPaddingManagerHandler>()); pipeline_->addFront(std::make_shared<ConnectionPacketWriter>(this)); /*初始化流水线*/pipeline_->finalize();...
}
调用addFront()向流水线中添加处理模块,IN模块添加到PipelineBase::inCtxs_中,OUT模块添加到PipelineBase::outCtxs_中,BOTH模块将同时添加到PipelineBase::inCtxs_和PipelineBase::outCtxs_中。
void Pipeline::finalize()
{...if (!inCtxs_.empty()) {front_ = dynamic_cast<InboundLink*>(inCtxs_.front());/*将inCtxs保存的模块组成一条链表*/for (size_t i = 0; i < inCtxs_.size() - 1; i++) {inCtxs_[i]->setNextIn(inCtxs_[i+1]);}/*最后一个结点的next指针置为nullptr*/inCtxs_.back()->setNextIn(nullptr);}...if (!outCtxs_.empty()) {back_ = dynamic_cast<OutboundLink*>(outCtxs_.back());/*将outCtxs保存的模块组成一条链表*/for (size_t i = outCtxs_.size() - 1; i > 0; i--) {outCtxs_[i]->setNextOut(outCtxs_[i-1]);}/*最开始一个结点的next指针置为nullptr*/outCtxs_.front()->setNextOut(nullptr);}
...
}
Pipeline::finalize()函数会将流水线按照类型,组成两条链表。
在WebRtcConnection流水线中,front_指向的是网络输入数据处理的流水线;back_用于指向处理输出到网络数据的流水线。
建立好的流水线如下图:
数据包在流水线中流转
输入流水线的流转
void RtpPaddingManagerHandler::read(Context *ctx, std::shared_ptr<DataPacket> packet) {ctx->fireRead(std::move(packet)); /*丢给下一个模块*/
}
RtpPaddingManagerHandler类没有对数据包进行处理直接丢给了下一个模块。
void ContextImpl::fireRead(std::shared_ptr<DataPacket> packet) override
{auto guard = this->pipelineWeak_.lock();if (this->nextIn_) {/*直接调用下一个模块的read()处理函数*/this->nextIn_->read(std::move(packet)); }
}
RtpPaddingManagerHandler的下一个模块是SenderBandwidthEstimationHandler,且这两者是通过链表相连的,所以this->nextIn_就是SenderBandwidthEstimationHandler模块,通过调用SenderBandwidthEstimationHandler的read()函数,就将packet送至了下一个模块。
输出流水线的流转
void RtpPaddingManagerHandler::write(Context *ctx, std::shared_ptr<DataPacket> packet) {RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(packet->data);...ctx->fireWrite(packet); /*丢给下一个模块*/
}
void ContextImpl::fireWrite(std::shared_ptr<DataPacket> packet) override
{auto guard = this->pipelineWeak_.lock();if (this->nextOut_) {this->nextOut_->write(std::move(packet)); /*传递给下一个模块*/}
}
RtpPaddingManagerHandler所在链表的下一个结点是this->nextOut,也就是ConnectionPacketWriter模块,调用其write()函数就将数据包丢给ConnectionPacketWriter模块处理。
WebRtcConnection对输入数据的处理
WebRtcConnection中的流水线有两个方向,一是从DTLSTransport接收数据,数据处理后送至MediaStream,这条流水线我们称为输入流水线;另外一个是相反的方向,我们称为输出流水线。
ConnectionPacketReader模块是输入流水线的最后一个模块。ConnectionPacketWriter是输出流水线的最后一个模块。
void WebRtcConnection::onTransportData(std::shared_ptr<DataPacket> packet, Transport *transport)
{...asyncTask([packet] (std::shared_ptr<WebRtcConnection> connection) {.../*将接收的数据packet,投递到pipeline。*/if (connection->pipeline_) {connection->pipeline_->read(std::move(packet));}});
}
DTLSTransport在向WebRtcConnection传递数据的时候,使用也使用了投递任务的方式。但在创建DTLSTransport对象时,使用的就是WebRtcConnection内部的woker_,所以这两个类会在同一个Worker线程内运行。DTLSTransport直接将数据送给WebRtcConnection也是没有问题的,使用任务队列也是可以的。
WebRtcConnection接收到DTLSTransport传递过来的数据后,将其送至流水线。pipeline_->read()是输入流水线的入口。
void Pipeline::read(std::shared_ptr<DataPacket> packet)
{if (!front_) {return;}/*将数据丢给流水线的首个模块*/front_->read(std::move(packet));
}
如上图,在输入流水线中,front_指向的是RtpPaddingManagerHandler模块,所以这个模块是流水线的入口模块。
数据依次经过流水线的RtpPaddingManagerHandler模块、SenderBandwidthEstimationHandler模块,最后到达ConnectionPacketReader模块。流水线处理后的数据,会送至ConnectionPacketReader::read()函数。
void ConnectionPacketReader::read(Context *ctx, std::shared_ptr<DataPacket> packet) override
{/*将输入流水线处理后的数据包,送至WebRtcConnection的read()函数。*/connection_->read(std::move(packet));
}
经过输入流水线的处理,数据包最终来到了WebRtcConnection::read()函数。
void WebRtcConnection::read(std::shared_ptr<DataPacket> packet)
{.../*遍历每一条MediaStream*/forEachMediaStream([packet, transport, ssrc] (const std::shared_ptr<MediaStream> &media_stream){/*找到数据包对应的MediaStream*/if (media_stream->isSourceSSRC(ssrc) || media_stream->isSinkSSRC(ssrc)) {/*将数据包发送至MediaStream*/media_stream->onTransportData(packet, transport);}});}
}
WebRtcConnection从流水线收到处理后的数据包,根据数据包中的ssrc找到对应的MediaStream,之后将数据包送至MediaStream。
void MediaStream::onTransportData(std::shared_ptr<DataPacket> incoming_packet, Transport *transport)
{.../*拷贝一份数据*/std::shared_ptr<DataPacket> packet = std::make_shared<DataPacket>(*incoming_packet);
.../*投递至MediaStream线程所在的任务队列*/worker_->task([stream_ptr, packet]{.../*media stream将数据写入流水线*/if (stream_ptr->pipeline_) {/*进入流水线,开始处理。*/stream_ptr->pipeline_->read(std::move(packet)); }});
}
此时还处于WebRtcConnection所在的Worker线程,在MediaStream::onTransportData()函数内,会拷贝一份数据,将数据和对应的MediaStream打包成一个任务,然后将任务投递到MediaStream所在Worker线程的任务队列。
WebRtcConnection(std::shared_ptr<Worker> worker, ...);
MediaStream(std::shared_ptr<Worker> worker, ...);
在创建WebRtcConnection和MediaStream的时候,若使用相同的worker,则两者会在同一个Worker线程运行,否则就是在不同的Worker线程运行。在不同的Worker线程内运行,使用任务队列传递数据是必要的。
WebRtcConnection对数据包的处理如下图:
MediaStream对输入数据的处理
[stream_ptr, packet]{.../*media stream将数据写入流水线*/if (stream_ptr->pipeline_) {/*进入流水线,开始处理。*/stream_ptr->pipeline_->read(std::move(packet)); }}
MediaStream所在的Worker线程在处理任务时,会执行上面的lambda对象,也就把数据送至了MediaStream内部的输入流水线。
MediaStream内部有一条更为负责的流水线,其中输入流水线用于处理WebRtcConnection发送过来的数据,也就是从网络接收的数据,输出流水线用于处理其他WebRtcConnection的MediaStream发送过来的数据。
void Pipeline::read(std::shared_ptr<DataPacket> packet)
{if (!front_) {return;}/*将数据送至流水线的第一个模块*/front_->read(std::move(packet));
}
PacketCodecParser是流水线的第一个处理模块,front_就指向该模块,front_->read()会将数据送至PacketCodecParser模块。数据送至流水线后,流水线的各个模块开始依次处理。
void PacketReader::read(Context *ctx, std::shared_ptr<DataPacket> packet) override
{/*将流水线处理后packet重新送至MediaStream*/media_stream_->read(std::move(packet));
}
PacketReader是输入流水线的最后一个模块,经过复杂的流水线处理后,在流水线的最后一个模块中,调用MediaStream::read()将处理后的数据包,重新送回MediaStream。
void MediaStream::read(std::shared_ptr<DataPacket> packet)
{...if (isVideoSourceSSRC(recvSSRC) && video_sink) {parseIncomingPayloadType(buf, len, VIDEO_PACKET);parseIncomingExtensionId(buf, len, VIDEO_PACKET);/*将视频数据分发给订阅者*/video_sink->deliverVideoData(std::move(packet)); } else if (isAudioSourceSSRC(recvSSRC) && audio_sink) {parseIncomingPayloadType(buf, len, AUDIO_PACKET);parseIncomingExtensionId(buf, len, AUDIO_PACKET);/*将音频数据分发给订阅者*/audio_sink->deliverAudioData(std::move(packet));} ...
}
从网络接收的数据,先是经过WebRtcConnection流水线的处理,再经过MediaStream流水线的处理。现在需要将其分发给接收客户端的MediaStream了。
video_sink->deliverVideoData(std::move(packet)); 用于将视频数据分发给订阅者。
audio_sink->deliverAudioData(std::move(packet)); 用于将音频数据分发给订阅者。
void MediaSource::setVideoSink(std::weak_ptr<MediaSink> video_sink)
{boost::mutex::scoped_lock lock(monitor_mutex_);video_sink_ = video_sink;
}
MediaSource是MediaStream的父类。
接收客户端在licode中,也有一个对应的WebRtcConnection和属于它的MediaStream。在上层的应用中,需要调用发送端MediaStream中MediaSource::setVideoSink(),将接收端的MediaStream作为参数。这样接收端的MediaStream就成为了发送端MediaStream的订阅者。
/*从video_sink->deliverVideoData(std::move(packet)); 开始,以下执行的这些函数,都是接收端MediaStream对象中的成员函数。*/
int MediaSink::deliverVideoData(std::shared_ptr<DataPacket> data_packet)
{/*会调用子类的覆写的虚函数*/return deliverVideoData_(data_packet);
}/*覆写MediaSink父类的deliverVideoData_虚函数*/
int MediaStream::deliverVideoData_(std::shared_ptr<DataPacket> video_packet)
{if (video_enabled_) {sendPacketAsync(std::make_shared<DataPacket>(*video_packet));}return video_packet->length;
}void MediaStream::sendPacketAsync(std::shared_ptr<DataPacket> packet)
{.../*投递任务发送packet*/worker_->task([stream_ptr, packet]{stream_ptr->sendPacket(packet);});
}
video_sink->deliverVideoData()中video_sink表示接收端的MediaStream对象,此时还在接收端MediaStream对象所在的Worker线程,在向发送端MediaStream对象传递数据时,会封装成一个任务投递过去。发送端MediaStream对象和接收端MediaStream对象可能在同一个Woker线程,也可能不在同一个线程,所以为了避免数据竞争,需要把要传递给接收端MediaStream对象的数据封装为一个任务,投递到接收端MediaStream对象所在Worker线程的任务队列。
WebRtcConnection(std::shared_ptr<Worker> worker, std::shared_ptr<IOWorker> io_worker,...);
在为发送客户端和接收客户端分别创建WebRtcConnection对象时,有两个很重要的参数是,一个是Worker线程参数worker和IOWorker线程参数io_worker。在创建WebRtcConnection对象时,若worker参数相同,说明发送端WebRtcConnection对象和接收端WebRtcConnection对象是在同一个Worker线程内。若io_worker参数相同,则licode接收发送端的数据和发送给接收端的数据是在同一个IOWorker线程内。
至此,licode已经处理完发送端发送过来的数据了,接下来就是接收端MediaStream对象对数据的处理了。
目前的数据流程如下图:
MediaStream对输出数据的处理
接收端MediaStream对象所在的Worker线程任务队列中已经有一个,发送端MediaStream对象投递的任务。这个任务就是一个lambda对象。
/*MediaStream::sendPacketAsync()函数中封装的任务。*/
[stream_ptr, packet]{stream_ptr->sendPacket(packet);}
接收端MediaStream对象所在的Worker线程,取出这个任务,在执行这个lambda对象时,数据就被接收到了接收端MediaStream::sendPacket()函数内。
void MediaStream::sendPacket(std::shared_ptr<DataPacket> p)
{.../*将packet送至输出流水线进行处理*/if (pipeline_) {pipeline_->write(std::move(p));}
}
接收到的数据被送至了MediaStream类内的输出流水线。
void Pipeline::write(std::shared_ptr<DataPacket> packet)
{if (!back_) {return;}/*送至输出流水线的第一个处理模块中*/back_->write(std::move(packet));
}
输出流水线处理模块组成的链表中,back_指向的是RtcpProcessorHandler模块,调用RtcpProcessorHandler::write()函数,将数据送至输出流水线,开始处理数据。
被标记为OUT的是输出流处理模块,数据从第一个处理RtcpProcessorHandler开始处理,在流水线中一个模块一个模块的流转,最后到到达了流水线的最后一个处理模块PacketWriter。
void PacketWriter::write(Context *ctx, std::shared_ptr<DataPacket> packet) override
{media_stream_->write(std::move(packet));
}
在PacketWriter::write()中,数据会从流水线重新送回MediaStream内。
void MediaStream::write(std::shared_ptr<DataPacket> packet)
{if (connection_) {connection_->send(packet);}
}
MediaStream收到经过输出流水线处理的数据后,将数据送至WebRtcConnection。
void WebRtcConnection::send(std::shared_ptr<DataPacket> packet)
{asyncTask([packet] (std::shared_ptr<WebRtcConnection> connection) {if (connection->pipeline_) {/*将接收到的packet,送至输出流水线。*/connection->pipeline_->write(std::move(packet));}});
}
此时还在MediaStream所在的Worker线程,为了向WebRtcConnection传递数据,需要将数据打包成一个任务,然后投递到WebRtcConnection所在Worker线程的任务队列。
WebRtcConnection对输出数据的处理
/*封装为lambda的任务*/
[packet] (std::shared_ptr<WebRtcConnection> connection) {if (connection->pipeline_) {/*将接收到的packet,发送至下面函数中。*/connection->pipeline_->write(std::move(packet));}}
WebRtcConnection所在的Worker线程从任务队列取出上面的lambda对象,处理该任务。处理任务时,会将从MediaStream接收的数据送至输出流水线。
void Pipeline::write(std::shared_ptr<DataPacket> packet)
{if (!back_) {return;}/*送至输出流水线的第一个模块*/back_->write(std::move(packet));
}
SenderBandwidthEstimationHandler是流水线的第一个模块,所以首先送至该模块处理。
void ConnectionPacketWriter::write(Context *ctx, std::shared_ptr<DataPacket> packet) override {/*将数据送回WebRtcConnection*/connection_->write(std::move(packet)); }
ConnectionPacketWriter是流水线的最后一个模块,当流水线的数据送至该模块时,该模块会将数据重新送回WebRtcConnection。
void WebRtcConnection::write(std::shared_ptr<DataPacket> packet)
{.../*找数据所属的DTLSTransport*/Transport *transport = (bundle_ || packet->type == VIDEO_PACKET) ? video_transport_.get() : audio_transport_.get();
.../*将待发送的数据,交由DtlsTransport处理。*/transport->write(packet->data, packet->length);
}
WebRtcConnection接收到数据后,会将数据送至DTLSTransport。
DtlsTransport对输入数据的处理
void DtlsTransport::write(char* data, int len)
{...if (this->getTransportState() == TRANSPORT_READY) {.../*使用SrtpChannel对数据进行加密*/if (ice_->checkIceState() == IceState::READY) {/*将数据送至NicerConnection*/writeOnIce(comp, protectBuf_, length); }}
}void Transport::writeOnIce(int comp, void* buf, int len)
{if (!running_){return;}/*ice将packet送至NicerConnection,由Nicer将数据发送至目的地。*/ice_->sendData(comp, buf, len);
}
DTLSTransport从WebRtcConnection接收到数据后,调用父类的Transport::writeOnIce()函数,将数据送至NicerConnection。
int NicerConnection::sendData(unsigned int component_id, const void* buf, int len)
{.../*将待发送的数据,拷贝至packet buf中。*/memcpy(packet->data, buf, len);packet->length = len;.../*向IOWorker投递发送任务*/async([nicer, packet, peer, stream, component_id, len] (std::shared_ptr<NicerConnection> this_ptr) {UINT4 r = nicer->IceMediaStreamSend(peer,stream,component_id,reinterpret_cast<unsigned char*>(packet->data),len);...});return len;
}
此时还在WebRtcConnection所在的Worker线程,现在需要将数据传递给NicerConnection所在IOWorker线程,所以将待发送的数据打包成一个任务,然后投递到NicerConnection所在IOWorker线程的任务队列。
使用nICEr库发送网络数据
/*封装为lambda的任务*/
[nicer, packet, peer, stream, component_id, len] (std::shared_ptr<NicerConnection> this_ptr) {UINT4 r = nicer->IceMediaStreamSend(peer,stream,component_id,reinterpret_cast<unsigned char*>(packet->data),len);...})int NicerInterfaceImpl::IceMediaStreamSend(nr_ice_peer_ctx *pctxp, nr_ice_media_stream *stream, int component, unsigned char *buffer, size_t length)
{return nr_ice_media_stream_send(pctxp, stream, component, buffer, length);
}
IOWorker线程会从队列中取出上面的任务,然后调用NicerInterfaceImpl::IceMediaStreamSend(),接着调用nICEr库的发送数据接口nr_ice_media_stream_send(),把需要发送给接收客户端的数据交由nICEr库负责发送。
完整流程
至此从发送客户端接收数据,经过licode内部处理,再发送给接收客户端的整个流程介绍完毕了。
整个流程如下图:
一对多
使用OneToManyProcessor类,可以将一个发送端的数据,转发给多个接收端。
OneToManyProcessor订阅发送端中的MediaStream,其他接收端的MediaStream订阅OneToManyProcessor。当发送端发送过来数据时,发送端的MediaStream将数据转发至OneToManyProcessor,接着该类再将数据分发给它的所有订阅者。
这是典型的观察者模式
注意:发送客户端MediaStream
指的是licode中建立的MediaStream,它可以代表发送客户端发送的媒体数据流。接收客户端MediaStream同理。
licode源码分析-媒体数据的处理流程相关推荐
- licode源码分析-线程模型
licode源码分析-线程模型 服务器一般都会服务于大量的用户,所以服务端程序的性能往往决定服务用户的多少.现在服务器上的CPU都是多核的,服务端程序为了充分发挥CPU的性能,会使用多进程或多线程.而 ...
- Ant-design 源码分析之数据展示(二)Badge
Ant-design 源码分析之数据展示(二)Badge 2021SC@SDUSC 一.组件结构 1.ant代码结构 2.rc-ant代码结构 3.组件结构 ant中Badge的index.tsx中引 ...
- HBase源码分析之HRegion上compact流程分析(三)
在<HBase源码分析之HRegion上compact流程分析(二)>一文中,我们没有讲解真正执行合并的CompactionContext的compact()方法.现在我们来分析下它的具体 ...
- Spark源码分析之Sort-Based Shuffle读写流程
一 概述 我们知道Spark Shuffle机制总共有三种: # 未优化的Hash Shuffle:每一个ShuffleMapTask都会为每一个ReducerTask创建一个单独的文件,总的文件数是 ...
- 【SA8295P 源码分析】18 - Camera Bringup 流程 及 源码分析
[SA8295P 源码分析]18 - Camera Bringup 流程 及 源码分析 一.Camera Bringup 流程 1.1 CameraConfigSA8295.c 配置文件解析 1.2 ...
- SOFA 源码分析 — 链路数据透传
前言 SOFA-RPC 支持数据链路透传功能,官方解释: 链路数据透传功能支持应用向调用上下文中存放数据,达到整个链路上的应用都可以操作该数据. 使用方式如下,可分别向链路的 request 和 re ...
- leveldb源码分析:数据查询
leveldb数据查询 查询的示例代码如下: string res; status = db->Get(ReadOptions(), "KeyNameExample", &a ...
- Nginx源码分析:master/worker工作流程概述
nginx源码分析 nginx-1.11.1 参考书籍<深入理解nginx模块开发与架构解析> Nginx的master与worker工作模式 在生成环境中的Nginx启动模式基本都是以m ...
- zookeeper源码分析之三客户端发送请求流程
znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的 ...
最新文章
- c# mysql executescalar_C# 操作MySQL数据库, ExecuteScalar()方法执行T-SQL语句, COUNT(*), 统计数据...
- 【Java多线程】实现Runnable接口方式 / 继承Thread类方式;使用synchronized锁实现线程安全;线程安全的懒汉式单例模式;死锁问题示例
- 面向对象 solid_用简单的英语解释面向对象程序设计的SOLID原理
- 二叉树的前中后层遍历
- Facebook宣布进一步推广Live Video功能
- 从棋盘左上角到右下角共有多少种走法
- [寒江孤叶丶的CrossApp之旅_07][入门系列]CrossApp中信息框CAAlertView的使用
- 解决win7任务栏谷歌浏览器chrome图标丢失、异常空白的问题
- Raspberry Pi树莓派分类和其相似产品介绍
- wifi又被隔壁老王蹭了,这样设置路由器,老王再也蹭不了你的wifi了……路由器设置实用教程基础篇
- win7计算机u盘不显示盘符,win7系统识别U盘但不显示盘符该如何解决?
- python如何编写温度转换_Python温度转换实例分析
- 软件测试薪资标准,软件测试月薪过万需要具备哪些技能?
- 12.15 村长项目借口
- 永恒python图片_第1章 python基础
- 内网时间同步,ntp与ntpdate区别,与ntp服务器搭建
- 北京林业大c语言程序设计考试试题,2016年北京林业大学信息学院程序设计语言、数据结构(上机操作)复试笔试最后押题五套卷...
- 迷宫寻宝(一) 搜索
- Weex工具链的奥秘
- 通过表结构生成数据库设计文档