本文主要分析licode C++部分对视频流的处理流程。主要介绍licode从发送客户端接收视频流,然后经过内部的处理,再将视频流发送到接收客户端。

licode虽然是MCU模型,但提供的主要功能还是SFU,它不能将同一个房间内的音视频进行混合,仅提供了单路视频流的转码功能。

现在先假设licode和一个发送客户端、一个接收客户端相连并转发视频流。

这是licode从网络接收发送客户端视频数据,经过内部流转,再通过网络发送至接收客户端的总览图。

使用nICEr库接收网络数据

licode使用nICEr库与客户端进行ICE交换,最终由nICEr库与客户端建立网络连接,并负责和客户端进行数据收发。

NicerInterfaceImpl类是对nICEr库的封装,NicerConnection类是对ICE的封装,用于连接的建立和数据的收发,它们的关系如下图:

NicerConnection和NicerInterpfaceImpl类对象,主要运行在IOWorker线程内,这个线程主要用于建立连接、数据的收发。

NicerConnection对象主要运行在IOWorker线程,当Worker线程调用NicerConnection成员函数时,若在函数中直接访问数据成员,则有可能造成数据竞争。在licode实现中,通过异步的任务队列,将线程不安全的成员函数,封装成了线程安全的函数。具体方式是,在有可能发生数据竞争的函数中,会封装一个异步任务丢到IOWorker队列,由IOWorker线程去处理,避免了Worker线程直接访问NicerConnection数据成员,从而避免了数据竞争。这样仅在Worker向IOWorker任务队列丢任务时需要上锁,这将大大减小临界区,从而提高了并发。

void NicerConnection::startSync()
{...ice_handler_vtbl_->msg_recvd = &NicerConnection::msg_recvd;...
}

在licode与客户端建立连接的过程中,在NicerConnection::startSync()函数中会把其静态函数NicerConnection::msg_recvd()注册到nICEr库中,当nICEr库收到客户端发送过来的数据时,就会回调这个函数,将数据包传送上来。

int NicerConnection::msg_recvd(void *obj, nr_ice_peer_ctx *pctx, nr_ice_media_stream *stream, int component_id,unsigned char *msg, int len)
{NicerConnection *conn = reinterpret_cast<NicerConnection*>(obj);conn->onData(component_id, reinterpret_cast<char*> (msg), static_cast<unsigned int> (len));return 0;
}

这个函数是静态函数,在被调用时,obj参数中保存的是具体的NicerConnection对象。通过conn对象将nICEr库收到数据包送至conn对象的onData()函数中。

void NicerConnection::onData(unsigned int component_id, char* buf, int len)
{IceState state;/*获取ICE的状态*/{boost::mutex::scoped_lock lock(close_mutex_);state = this->checkIceState();}/*如果处于ready状态,则可以继续向下传送。*/if (state == IceState::READY) {/*以下代码是将收到的数据包封装成DataPacket对象*/packetPtr packet (new DataPacket());memcpy(packet->data, buf, len);/*用于指示是rtp还是rtcp。*/packet->comp = component_id;      packet->length = len;/*记录packet的接收时间*/packet->received_time_ms = ClockUtils::timePointToMs(clock::now());if (auto listener = getIceListener().lock()) {/*将packet分发至其订阅者DtlsTransport*/listener->onPacketReceived(packet);     }}
}void DtlsTransport::start()
{ice_->setIceListener(shared_from_this());  /*设置IceConnection的订阅者*/...
}

如果ICE处于READY状态,说明可以将数据包往下流转。先是将收到的数据封装成DataPacket对象,以方便之后的处理。接着将数据包发送给它的订阅者DtlsTransport。

在NicerConnection在DtlsTransport::start()函数中,通过setIceListener()设置了它的订阅者DtlsTransport。

void Transport::onPacketReceived(packetPtr packet)
{std::weak_ptr<Transport> weak_transport = Transport::shared_from_this();/*封装为一个异步的任务丢到Worker线程的任务队列上*/worker_->task([weak_transport, packet]() {if (auto this_ptr = weak_transport.lock()){if (packet->length > 0) {/*将收到的packet,传送至DtlsTransport。*/this_ptr->onIceData(packet);   }if (packet->length == -1){this_ptr->running_ = false;return;}}});}

NicerConnection先将数据包发送到了DtlsTransport父类的函数Transport::onPacketReceived()函数中。

因为子类覆写了虚函数onIceData()函数,所以this_ptr->onIceData(packet)调用的是DtlsTransport类内的onIceData()函数。

Transport::onPacketReceived()函数的调用,还是在IOWorker线程中,现在IOWorker线程会将收到的数据包封装到一个异步的任务中,让后将这个任务丢到WebRtcConnection所在的Worker线程中,接着由Worker线程处理收到的数据包。

[weak_transport, packet]() {if (auto this_ptr = weak_transport.lock()){if (packet->length > 0) {/*将收到的packet,传送至DtlsTransport。*/this_ptr->onIceData(packet);   }if (packet->length == -1){this_ptr->running_ = false;return;}}}

这是在Transport::onPacketReceived()函数中封装的lambda对象,这个lambda对象会在WebRtcConnection所在的Worker线程中执行。

现在发送客户端的数据包已经由IOWorker线程通过nICEr库从网络接收。先是封装成DataPacket对象,

再进一步封装为一个异步的任务丢到了Woker线程的任务队列,交由Worker线程处理。Woker线程在执行上面的lambda时,会将数据包送至传送至DtlsTransport内。

上面的整个过程如下图所示:

DtlsTransport对输入数据的处理

现在经过IOWorker线程从网络接收数据包,经过流转,最终将数据包封装成一个任务丢到了Woker线程的任务队列,Worker线程执行到IOWorker线程丢过来的任务时,会通过调用了DtlsTransport::onIceData()函数,将数据包传递至DtlsTransport类内。

现在DtlsTransport对象会在Worker线程内对数据包进行处理。

void DtlsTransport::onIceData(packetPtr packet)
{.../*根据数据类型的不同,分类处理。*/if (DtlsTransport::isDtlsPacket(data, len)) { /*DTLS握手时的数据包,交由DtlsSocketContext处理。*/...} else if (this->getTransportState() == TRANSPORT_READY){/*RTP和RTCP数据包*//*如果使用了srtp,则需要使用SrtpChannel类进行解密。*/if (srtp != NULL) {...}if (auto listener = getTransportListener().lock()) {/*将解密后的数据,分发至其订阅者WebRtcConnection。*/listener->onTransportData(unprotect_packet, this);   }}
}

DtlsTransport会根据数据包的类型,进行分类处理。若是DTLS握手报文,则交由DtlsSocketContext处理,否则就是RTP/RTCP数据包。

若开启了srtp,需要对接收的数据送至SrtpChannel进行解密。最后将解密后的数据包,分发给订阅者WebRtcConnection。

上面流程如下图:

流水线-pipeline

在介绍WebRtcConnection对数据包处理之前,先介绍一下licode中的流水线。流水线处理是licode中最具特色的地方,非常便于对数据包处理的扩展。

Pipeline的实现还是有些复杂的,此处并不介绍Pipeline是怎么实现,只讲一讲如何使用流水线,流水线是如何运行起来的,以及数据包是如何在流水线内部流转的。

Pipeline类图

pipeline的handle

handle负责对媒体数据的具体处理,分为三种类型:IN、OUT、BOTH。

在licode中,每个与客户端连接的WebRtcConnection内,都会有两条双向的流水线。一条在WebRtcConnection类内,另一条在MediaStream类内。

流水线是双向的,在WebRtcConnection类内的流水线,IN模块只用于处理DTLSTransport类发送过来的数据,也就是收到的网络数据。OUT模块只用于MediaStream发送过来的数据,数据经过OUT模块处理后发送给DTLSTransport。BOTH模块会处理这两个方向上的数据。

IN类型

IN类型的模块,需要继承自InboundHandler类,其中最重要的是read()。流水线的上一个模块在处理完数据包后,会调用本模块的read()函数,将数据包交由本模块。所以这个函数用于接收上个处理模块发送过来的数据包。当在本模块中处理完毕后,需要调用fireRead()函数将数据包传给下一个模块。

void PacketCodecParser::read(Context *ctx, std::shared_ptr<DataPacket> packet)
{RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(packet->data);if (!chead->isRtcp() && enabled_) {...}/*传给下一个模块*/ctx->fireRead(std::move(packet));
}

PacketCodecParser是一个IN类型的模块,上一个模块处理完数据后,会将数据包丢到这里的read()函数,在这个函数中处理完毕后,会通过ctx->fireRead(std::move(packet))将数据包丢给下一个处理模块。

OUT类型

OUT类型的模块,需要继承自OutboundHandler类。write()函数等同于IN类中的read()函数,用于接收上一个模块发送过来的数据包。

void FecReceiverHandler::write(Context *ctx, std::shared_ptr<DataPacket> packet) {if (enabled_ && packet->type == VIDEO_PACKET) {...}/*传给下一个模块*/ctx->fireWrite(std::move(packet));
}

FecReceiverHandler是一个OUT类型的模块,在write()函数中接收上一个模块传入的数据包,处理结束后,通过ctx->fireWrite(std::move(packet))传递给下一个模块。

BOTH类型

BOTH类型的模块,需要继承自Handler类。在WebRtcConnection流水线中,read()函数用于接收从网络方向来的数据包,write()用于接收向网络方向发送的数据包。

void RtcpProcessorHandler::read(Context *ctx, std::shared_ptr<DataPacket> packet) {RtcpHeader *chead = reinterpret_cast<RtcpHeader*> (packet->data);if (chead->isRtcp()) {...}processor_->checkRtcpFb();/*丢给下一个模块*/ctx->fireRead(std::move(packet));
}void RtcpProcessorHandler::write(Context *ctx, std::shared_ptr<DataPacket> packet) {RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(packet->data);if (chead->isFeedback()) {...}/*丢给下一个模块*/ctx->fireWrite(std::move(packet));
}

RtcpProcessorHandler是一个BOTH类型的模块,相当于同时具有IN类型和OUT类型。read()函数同IN类型,write()函数同OUT类型。

WebRtcConnection中流水线的建立

void WebRtcConnection::initializePipeline()
{.../*向流水线中添加处理模块*/pipeline_->addFront(std::make_shared<ConnectionPacketReader>(this));    pipeline_->addFront(std::make_shared<SenderBandwidthEstimationHandler>());  pipeline_->addFront(std::make_shared<RtpPaddingManagerHandler>());       pipeline_->addFront(std::make_shared<ConnectionPacketWriter>(this));     /*初始化流水线*/pipeline_->finalize();...
}

调用addFront()向流水线中添加处理模块,IN模块添加到PipelineBase::inCtxs_中,OUT模块添加到PipelineBase::outCtxs_中,BOTH模块将同时添加到PipelineBase::inCtxs_和PipelineBase::outCtxs_中。

void Pipeline::finalize()
{...if (!inCtxs_.empty()) {front_ = dynamic_cast<InboundLink*>(inCtxs_.front());/*将inCtxs保存的模块组成一条链表*/for (size_t i = 0; i < inCtxs_.size() - 1; i++) {inCtxs_[i]->setNextIn(inCtxs_[i+1]);}/*最后一个结点的next指针置为nullptr*/inCtxs_.back()->setNextIn(nullptr);}...if (!outCtxs_.empty()) {back_ = dynamic_cast<OutboundLink*>(outCtxs_.back());/*将outCtxs保存的模块组成一条链表*/for (size_t i = outCtxs_.size() - 1; i > 0; i--) {outCtxs_[i]->setNextOut(outCtxs_[i-1]);}/*最开始一个结点的next指针置为nullptr*/outCtxs_.front()->setNextOut(nullptr);}
...
}

Pipeline::finalize()函数会将流水线按照类型,组成两条链表。

在WebRtcConnection流水线中,front_指向的是网络输入数据处理的流水线;back_用于指向处理输出到网络数据的流水线。

建立好的流水线如下图:

数据包在流水线中流转

输入流水线的流转
void RtpPaddingManagerHandler::read(Context *ctx, std::shared_ptr<DataPacket> packet) {ctx->fireRead(std::move(packet));   /*丢给下一个模块*/
}

RtpPaddingManagerHandler类没有对数据包进行处理直接丢给了下一个模块。

void ContextImpl::fireRead(std::shared_ptr<DataPacket> packet) override
{auto guard = this->pipelineWeak_.lock();if (this->nextIn_) {/*直接调用下一个模块的read()处理函数*/this->nextIn_->read(std::move(packet));   }
}

RtpPaddingManagerHandler的下一个模块是SenderBandwidthEstimationHandler,且这两者是通过链表相连的,所以this->nextIn_就是SenderBandwidthEstimationHandler模块,通过调用SenderBandwidthEstimationHandler的read()函数,就将packet送至了下一个模块。

输出流水线的流转
void RtpPaddingManagerHandler::write(Context *ctx, std::shared_ptr<DataPacket> packet) {RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(packet->data);...ctx->fireWrite(packet);  /*丢给下一个模块*/
}
void ContextImpl::fireWrite(std::shared_ptr<DataPacket> packet) override
{auto guard = this->pipelineWeak_.lock();if (this->nextOut_) {this->nextOut_->write(std::move(packet));    /*传递给下一个模块*/}
}

RtpPaddingManagerHandler所在链表的下一个结点是this->nextOut,也就是ConnectionPacketWriter模块,调用其write()函数就将数据包丢给ConnectionPacketWriter模块处理。

WebRtcConnection对输入数据的处理

WebRtcConnection中的流水线有两个方向,一是从DTLSTransport接收数据,数据处理后送至MediaStream,这条流水线我们称为输入流水线;另外一个是相反的方向,我们称为输出流水线。

ConnectionPacketReader模块是输入流水线的最后一个模块。ConnectionPacketWriter是输出流水线的最后一个模块。

void WebRtcConnection::onTransportData(std::shared_ptr<DataPacket> packet, Transport *transport)
{...asyncTask([packet] (std::shared_ptr<WebRtcConnection> connection) {.../*将接收的数据packet,投递到pipeline。*/if (connection->pipeline_) {connection->pipeline_->read(std::move(packet));}});
}

DTLSTransport在向WebRtcConnection传递数据的时候,使用也使用了投递任务的方式。但在创建DTLSTransport对象时,使用的就是WebRtcConnection内部的woker_,所以这两个类会在同一个Worker线程内运行。DTLSTransport直接将数据送给WebRtcConnection也是没有问题的,使用任务队列也是可以的。

WebRtcConnection接收到DTLSTransport传递过来的数据后,将其送至流水线。pipeline_->read()是输入流水线的入口。

void Pipeline::read(std::shared_ptr<DataPacket> packet)
{if (!front_) {return;}/*将数据丢给流水线的首个模块*/front_->read(std::move(packet));
}

如上图,在输入流水线中,front_指向的是RtpPaddingManagerHandler模块,所以这个模块是流水线的入口模块。

数据依次经过流水线的RtpPaddingManagerHandler模块、SenderBandwidthEstimationHandler模块,最后到达ConnectionPacketReader模块。流水线处理后的数据,会送至ConnectionPacketReader::read()函数。

void ConnectionPacketReader::read(Context *ctx, std::shared_ptr<DataPacket> packet) override
{/*将输入流水线处理后的数据包,送至WebRtcConnection的read()函数。*/connection_->read(std::move(packet));
}

经过输入流水线的处理,数据包最终来到了WebRtcConnection::read()函数。

void WebRtcConnection::read(std::shared_ptr<DataPacket> packet)
{.../*遍历每一条MediaStream*/forEachMediaStream([packet, transport, ssrc] (const std::shared_ptr<MediaStream> &media_stream){/*找到数据包对应的MediaStream*/if (media_stream->isSourceSSRC(ssrc) || media_stream->isSinkSSRC(ssrc)) {/*将数据包发送至MediaStream*/media_stream->onTransportData(packet, transport);}});}
}

WebRtcConnection从流水线收到处理后的数据包,根据数据包中的ssrc找到对应的MediaStream,之后将数据包送至MediaStream。

void MediaStream::onTransportData(std::shared_ptr<DataPacket> incoming_packet, Transport *transport)
{.../*拷贝一份数据*/std::shared_ptr<DataPacket> packet = std::make_shared<DataPacket>(*incoming_packet);
.../*投递至MediaStream线程所在的任务队列*/worker_->task([stream_ptr, packet]{.../*media stream将数据写入流水线*/if (stream_ptr->pipeline_) {/*进入流水线,开始处理。*/stream_ptr->pipeline_->read(std::move(packet));    }});
}

此时还处于WebRtcConnection所在的Worker线程,在MediaStream::onTransportData()函数内,会拷贝一份数据,将数据和对应的MediaStream打包成一个任务,然后将任务投递到MediaStream所在Worker线程的任务队列。

WebRtcConnection(std::shared_ptr<Worker> worker, ...);
MediaStream(std::shared_ptr<Worker> worker, ...);

在创建WebRtcConnection和MediaStream的时候,若使用相同的worker,则两者会在同一个Worker线程运行,否则就是在不同的Worker线程运行。在不同的Worker线程内运行,使用任务队列传递数据是必要的。

WebRtcConnection对数据包的处理如下图:

MediaStream对输入数据的处理

[stream_ptr, packet]{.../*media stream将数据写入流水线*/if (stream_ptr->pipeline_) {/*进入流水线,开始处理。*/stream_ptr->pipeline_->read(std::move(packet));    }}

MediaStream所在的Worker线程在处理任务时,会执行上面的lambda对象,也就把数据送至了MediaStream内部的输入流水线。

MediaStream内部有一条更为负责的流水线,其中输入流水线用于处理WebRtcConnection发送过来的数据,也就是从网络接收的数据,输出流水线用于处理其他WebRtcConnection的MediaStream发送过来的数据。

void Pipeline::read(std::shared_ptr<DataPacket> packet)
{if (!front_) {return;}/*将数据送至流水线的第一个模块*/front_->read(std::move(packet));
}

PacketCodecParser是流水线的第一个处理模块,front_就指向该模块,front_->read()会将数据送至PacketCodecParser模块。数据送至流水线后,流水线的各个模块开始依次处理。

void PacketReader::read(Context *ctx, std::shared_ptr<DataPacket> packet) override
{/*将流水线处理后packet重新送至MediaStream*/media_stream_->read(std::move(packet));
}

PacketReader是输入流水线的最后一个模块,经过复杂的流水线处理后,在流水线的最后一个模块中,调用MediaStream::read()将处理后的数据包,重新送回MediaStream。

void MediaStream::read(std::shared_ptr<DataPacket> packet)
{...if (isVideoSourceSSRC(recvSSRC) && video_sink) {parseIncomingPayloadType(buf, len, VIDEO_PACKET);parseIncomingExtensionId(buf, len, VIDEO_PACKET);/*将视频数据分发给订阅者*/video_sink->deliverVideoData(std::move(packet));    } else if (isAudioSourceSSRC(recvSSRC) && audio_sink) {parseIncomingPayloadType(buf, len, AUDIO_PACKET);parseIncomingExtensionId(buf, len, AUDIO_PACKET);/*将音频数据分发给订阅者*/audio_sink->deliverAudioData(std::move(packet));} ...
}

从网络接收的数据,先是经过WebRtcConnection流水线的处理,再经过MediaStream流水线的处理。现在需要将其分发给接收客户端的MediaStream了。

video_sink->deliverVideoData(std::move(packet)); 用于将视频数据分发给订阅者。

audio_sink->deliverAudioData(std::move(packet)); 用于将音频数据分发给订阅者。

void MediaSource::setVideoSink(std::weak_ptr<MediaSink> video_sink)
{boost::mutex::scoped_lock lock(monitor_mutex_);video_sink_ = video_sink;
}

MediaSource是MediaStream的父类。

接收客户端在licode中,也有一个对应的WebRtcConnection和属于它的MediaStream。在上层的应用中,需要调用发送端MediaStream中MediaSource::setVideoSink(),将接收端的MediaStream作为参数。这样接收端的MediaStream就成为了发送端MediaStream的订阅者。

/*从video_sink->deliverVideoData(std::move(packet)); 开始,以下执行的这些函数,都是接收端MediaStream对象中的成员函数。*/
int MediaSink::deliverVideoData(std::shared_ptr<DataPacket> data_packet)
{/*会调用子类的覆写的虚函数*/return deliverVideoData_(data_packet);
}/*覆写MediaSink父类的deliverVideoData_虚函数*/
int MediaStream::deliverVideoData_(std::shared_ptr<DataPacket> video_packet)
{if (video_enabled_) {sendPacketAsync(std::make_shared<DataPacket>(*video_packet));}return video_packet->length;
}void MediaStream::sendPacketAsync(std::shared_ptr<DataPacket> packet)
{.../*投递任务发送packet*/worker_->task([stream_ptr, packet]{stream_ptr->sendPacket(packet);});
}

video_sink->deliverVideoData()中video_sink表示接收端的MediaStream对象,此时还在接收端MediaStream对象所在的Worker线程,在向发送端MediaStream对象传递数据时,会封装成一个任务投递过去。发送端MediaStream对象和接收端MediaStream对象可能在同一个Woker线程,也可能不在同一个线程,所以为了避免数据竞争,需要把要传递给接收端MediaStream对象的数据封装为一个任务,投递到接收端MediaStream对象所在Worker线程的任务队列。

WebRtcConnection(std::shared_ptr<Worker> worker, std::shared_ptr<IOWorker> io_worker,...);

在为发送客户端和接收客户端分别创建WebRtcConnection对象时,有两个很重要的参数是,一个是Worker线程参数worker和IOWorker线程参数io_worker。在创建WebRtcConnection对象时,若worker参数相同,说明发送端WebRtcConnection对象和接收端WebRtcConnection对象是在同一个Worker线程内。若io_worker参数相同,则licode接收发送端的数据和发送给接收端的数据是在同一个IOWorker线程内。

至此,licode已经处理完发送端发送过来的数据了,接下来就是接收端MediaStream对象对数据的处理了。

目前的数据流程如下图:

MediaStream对输出数据的处理

接收端MediaStream对象所在的Worker线程任务队列中已经有一个,发送端MediaStream对象投递的任务。这个任务就是一个lambda对象。

/*MediaStream::sendPacketAsync()函数中封装的任务。*/
[stream_ptr, packet]{stream_ptr->sendPacket(packet);}

接收端MediaStream对象所在的Worker线程,取出这个任务,在执行这个lambda对象时,数据就被接收到了接收端MediaStream::sendPacket()函数内。

void MediaStream::sendPacket(std::shared_ptr<DataPacket> p)
{.../*将packet送至输出流水线进行处理*/if (pipeline_) {pipeline_->write(std::move(p));}
}

接收到的数据被送至了MediaStream类内的输出流水线。

void Pipeline::write(std::shared_ptr<DataPacket> packet)
{if (!back_) {return;}/*送至输出流水线的第一个处理模块中*/back_->write(std::move(packet));
}

输出流水线处理模块组成的链表中,back_指向的是RtcpProcessorHandler模块,调用RtcpProcessorHandler::write()函数,将数据送至输出流水线,开始处理数据。

被标记为OUT的是输出流处理模块,数据从第一个处理RtcpProcessorHandler开始处理,在流水线中一个模块一个模块的流转,最后到到达了流水线的最后一个处理模块PacketWriter。

void PacketWriter::write(Context *ctx, std::shared_ptr<DataPacket> packet) override
{media_stream_->write(std::move(packet));
}

在PacketWriter::write()中,数据会从流水线重新送回MediaStream内。

void MediaStream::write(std::shared_ptr<DataPacket> packet)
{if (connection_) {connection_->send(packet);}
}

MediaStream收到经过输出流水线处理的数据后,将数据送至WebRtcConnection。

void WebRtcConnection::send(std::shared_ptr<DataPacket> packet)
{asyncTask([packet] (std::shared_ptr<WebRtcConnection> connection) {if (connection->pipeline_) {/*将接收到的packet,送至输出流水线。*/connection->pipeline_->write(std::move(packet));}});
}

此时还在MediaStream所在的Worker线程,为了向WebRtcConnection传递数据,需要将数据打包成一个任务,然后投递到WebRtcConnection所在Worker线程的任务队列。

WebRtcConnection对输出数据的处理

/*封装为lambda的任务*/
[packet] (std::shared_ptr<WebRtcConnection> connection) {if (connection->pipeline_) {/*将接收到的packet,发送至下面函数中。*/connection->pipeline_->write(std::move(packet));}}

WebRtcConnection所在的Worker线程从任务队列取出上面的lambda对象,处理该任务。处理任务时,会将从MediaStream接收的数据送至输出流水线。

void Pipeline::write(std::shared_ptr<DataPacket> packet)
{if (!back_) {return;}/*送至输出流水线的第一个模块*/back_->write(std::move(packet));
}

SenderBandwidthEstimationHandler是流水线的第一个模块,所以首先送至该模块处理。

void ConnectionPacketWriter::write(Context *ctx, std::shared_ptr<DataPacket> packet) override {/*将数据送回WebRtcConnection*/connection_->write(std::move(packet));     }

ConnectionPacketWriter是流水线的最后一个模块,当流水线的数据送至该模块时,该模块会将数据重新送回WebRtcConnection。

void WebRtcConnection::write(std::shared_ptr<DataPacket> packet)
{.../*找数据所属的DTLSTransport*/Transport *transport = (bundle_ || packet->type == VIDEO_PACKET) ? video_transport_.get() : audio_transport_.get();
.../*将待发送的数据,交由DtlsTransport处理。*/transport->write(packet->data, packet->length);
}

WebRtcConnection接收到数据后,会将数据送至DTLSTransport。

DtlsTransport对输入数据的处理

void DtlsTransport::write(char* data, int len)
{...if (this->getTransportState() == TRANSPORT_READY) {.../*使用SrtpChannel对数据进行加密*/if (ice_->checkIceState() == IceState::READY) {/*将数据送至NicerConnection*/writeOnIce(comp, protectBuf_, length);     }}
}void Transport::writeOnIce(int comp, void* buf, int len)
{if (!running_){return;}/*ice将packet送至NicerConnection,由Nicer将数据发送至目的地。*/ice_->sendData(comp, buf, len);
}

DTLSTransport从WebRtcConnection接收到数据后,调用父类的Transport::writeOnIce()函数,将数据送至NicerConnection。

int NicerConnection::sendData(unsigned int component_id, const void* buf, int len)
{.../*将待发送的数据,拷贝至packet buf中。*/memcpy(packet->data, buf, len);packet->length = len;.../*向IOWorker投递发送任务*/async([nicer, packet, peer, stream, component_id, len] (std::shared_ptr<NicerConnection> this_ptr) {UINT4 r = nicer->IceMediaStreamSend(peer,stream,component_id,reinterpret_cast<unsigned char*>(packet->data),len);...});return len;
}

此时还在WebRtcConnection所在的Worker线程,现在需要将数据传递给NicerConnection所在IOWorker线程,所以将待发送的数据打包成一个任务,然后投递到NicerConnection所在IOWorker线程的任务队列。

使用nICEr库发送网络数据

/*封装为lambda的任务*/
[nicer, packet, peer, stream, component_id, len] (std::shared_ptr<NicerConnection> this_ptr) {UINT4 r = nicer->IceMediaStreamSend(peer,stream,component_id,reinterpret_cast<unsigned char*>(packet->data),len);...})int NicerInterfaceImpl::IceMediaStreamSend(nr_ice_peer_ctx *pctxp, nr_ice_media_stream *stream, int component, unsigned char *buffer, size_t length)
{return nr_ice_media_stream_send(pctxp, stream, component, buffer, length);
}

IOWorker线程会从队列中取出上面的任务,然后调用NicerInterfaceImpl::IceMediaStreamSend(),接着调用nICEr库的发送数据接口nr_ice_media_stream_send(),把需要发送给接收客户端的数据交由nICEr库负责发送。

完整流程

至此从发送客户端接收数据,经过licode内部处理,再发送给接收客户端的整个流程介绍完毕了。

整个流程如下图:

一对多

使用OneToManyProcessor类,可以将一个发送端的数据,转发给多个接收端。

OneToManyProcessor订阅发送端中的MediaStream,其他接收端的MediaStream订阅OneToManyProcessor。当发送端发送过来数据时,发送端的MediaStream将数据转发至OneToManyProcessor,接着该类再将数据分发给它的所有订阅者。

这是典型的观察者模式

注意:发送客户端MediaStream指的是licode中建立的MediaStream,它可以代表发送客户端发送的媒体数据流。接收客户端MediaStream同理。

licode源码分析-媒体数据的处理流程相关推荐

  1. licode源码分析-线程模型

    licode源码分析-线程模型 服务器一般都会服务于大量的用户,所以服务端程序的性能往往决定服务用户的多少.现在服务器上的CPU都是多核的,服务端程序为了充分发挥CPU的性能,会使用多进程或多线程.而 ...

  2. Ant-design 源码分析之数据展示(二)Badge

    Ant-design 源码分析之数据展示(二)Badge 2021SC@SDUSC 一.组件结构 1.ant代码结构 2.rc-ant代码结构 3.组件结构 ant中Badge的index.tsx中引 ...

  3. HBase源码分析之HRegion上compact流程分析(三)

    在<HBase源码分析之HRegion上compact流程分析(二)>一文中,我们没有讲解真正执行合并的CompactionContext的compact()方法.现在我们来分析下它的具体 ...

  4. Spark源码分析之Sort-Based Shuffle读写流程

    一 概述 我们知道Spark Shuffle机制总共有三种: # 未优化的Hash Shuffle:每一个ShuffleMapTask都会为每一个ReducerTask创建一个单独的文件,总的文件数是 ...

  5. 【SA8295P 源码分析】18 - Camera Bringup 流程 及 源码分析

    [SA8295P 源码分析]18 - Camera Bringup 流程 及 源码分析 一.Camera Bringup 流程 1.1 CameraConfigSA8295.c 配置文件解析 1.2 ...

  6. SOFA 源码分析 — 链路数据透传

    前言 SOFA-RPC 支持数据链路透传功能,官方解释: 链路数据透传功能支持应用向调用上下文中存放数据,达到整个链路上的应用都可以操作该数据. 使用方式如下,可分别向链路的 request 和 re ...

  7. leveldb源码分析:数据查询

    leveldb数据查询 查询的示例代码如下: string res; status = db->Get(ReadOptions(), "KeyNameExample", &a ...

  8. Nginx源码分析:master/worker工作流程概述

    nginx源码分析 nginx-1.11.1 参考书籍<深入理解nginx模块开发与架构解析> Nginx的master与worker工作模式 在生成环境中的Nginx启动模式基本都是以m ...

  9. zookeeper源码分析之三客户端发送请求流程

    znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的 ...

最新文章

  1. c# mysql executescalar_C# 操作MySQL数据库, ExecuteScalar()方法执行T-SQL语句, COUNT(*), 统计数据...
  2. 【Java多线程】实现Runnable接口方式 / 继承Thread类方式;使用synchronized锁实现线程安全;线程安全的懒汉式单例模式;死锁问题示例
  3. 面向对象 solid_用简单的英语解释面向对象程序设计的SOLID原理
  4. 二叉树的前中后层遍历
  5. Facebook宣布进一步推广Live Video功能
  6. 从棋盘左上角到右下角共有多少种走法
  7. [寒江孤叶丶的CrossApp之旅_07][入门系列]CrossApp中信息框CAAlertView的使用
  8. 解决win7任务栏谷歌浏览器chrome图标丢失、异常空白的问题
  9. Raspberry Pi树莓派分类和其相似产品介绍
  10. wifi又被隔壁老王蹭了,这样设置路由器,老王再也蹭不了你的wifi了……路由器设置实用教程基础篇
  11. win7计算机u盘不显示盘符,win7系统识别U盘但不显示盘符该如何解决?
  12. python如何编写温度转换_Python温度转换实例分析
  13. 软件测试薪资标准,软件测试月薪过万需要具备哪些技能?
  14. 12.15 村长项目借口
  15. 永恒python图片_第1章 python基础
  16. 内网时间同步,ntp与ntpdate区别,与ntp服务器搭建
  17. 北京林业大c语言程序设计考试试题,2016年北京林业大学信息学院程序设计语言、数据结构(上机操作)复试笔试最后押题五套卷...
  18. 迷宫寻宝(一) 搜索
  19. Weex工具链的奥秘
  20. 通过表结构生成数据库设计文档

热门文章

  1. 前端一定得学ajax吗,前端学习之ajax
  2. AI:陆奇博士(原微软全球执行副总裁/百度COO/现奇绩创坛创始人兼CEO)演讲之《正视挑战,把握创业创新机会》
  3. springmvc配置过滤器
  4. Ros机器人/物联网操作系统
  5. 一张图了解app测试点:专项测试
  6. 打印一个我们熟知的乘法口诀表!
  7. python k线顶分型_K线战法之『顶底分型』高手懂的!
  8. ViveInputUtility-瞬移(3)
  9. iwebsec靶场 SQL注入漏洞通关笔记4- sleep注入(时间型盲注)
  10. MAC地址,IP地址以及ARP协议