EOS中plugin之net_plugin

这部分重点介绍EOS中的服务器端部分nodeos启动之后开启的另外一个重要的插件——net_plugin,这个插件主要负责服务器在网络中的接入、同步区块信息、断开等功能。对于这个插件,首先从其类的定义开始了解。

class net_plugin : public appbase::plugin<net_plugin>
{public:net_plugin();virtual ~net_plugin();APPBASE_PLUGIN_REQUIRES((chain_plugin))    // net_plugin这个插件的启动,依赖chain_plugin插件virtual void set_program_options(options_description& cli, options_description& cfg) override;void plugin_initialize(const variables_map& options);  // 插件初始化void plugin_startup();                                 // 插件的启动void plugin_shutdown();                                // 插件关闭void   broadcast_block(const chain::signed_block &sb); // 广播区块string                       connect( const string& endpoint ); // 连接其他端点string                       disconnect( const string& endpoint ); // 断开连接optional<connection_status>  status( const string& endpoint )const;   // 查看与某个端点的链接状态vector<connection_status>    connections()const;                   // 查看连接状态size_t num_peers() const;                                          // 查看建立连接的端点个数private:std::unique_ptr<class net_plugin_impl> my;   // 和producer_plugin_imple一样,这个插件负责网络中的具体操作
};

net_plugin插件的代码还是比较容易理解的,net_plugin的初始化函数非常简单,代码如下:

net_plugin::net_plugin():my( new net_plugin_impl ) {my_impl = my.get();
}

初始化函数中生成了一阁net_plugin_impl的实例,随后将my的指针赋值给了net_plugin_impl类中的定义的静态指针,这个静态指针的定义如下:

static net_plugin_impl *my_impl;

再nodeos的main函数中,net_plugin首先会初始化,随后调用其plugin_initialize函数和EOS中lugin之producer_plugin的介绍的procuder_plugin类初始化方式时一样的,先寻找这个插件依赖的插件,然后初始化依赖的插件、一些启动参数,设置心跳计时器等。

nodeos中初始化完毕,随后调用net_plugin的plugin_startup函数,该函数代码如下:

void net_plugin::plugin_startup() {my->producer_plug = app().find_plugin<producer_plugin>();if( my->acceptor ) {// 常见的网络服务操作,打开监听服务,设置选项,绑定地址,启动监听my->acceptor->open(my->listen_endpoint.protocol());my->acceptor->set_option(tcp::acceptor::reuse_address(true));try {// acceptor 来自于boost::asio::ip::tcp,即tcp::acceptormy->acceptor->bind(my->listen_endpoint);} catch (const std::exception& e) {ilog("net_plugin::plugin_startup failed to bind to port ${port}",("port", my->listen_endpoint.port()));throw e;}my->acceptor->listen();ilog("starting listener, max clients is ${mc}",("mc",my->max_client_count));my->start_listen_loop();   // 循环监听函数}chain::controller&cc = my->chain_plug->chain();{cc.accepted_block.connect(  boost::bind(&net_plugin_impl::accepted_block, my.get(), _1));}my->incoming_transaction_ack_subscription = app().get_channel<channels::transaction_ack>().subscribe(boost::bind(&net_plugin_impl::transaction_ack, my.get(), _1));if( cc.get_read_mode() == chain::db_read_mode::READ_ONLY ) {my->max_nodes_per_host = 0;ilog( "node in read-only mode setting max_nodes_per_host to 0 to prevent connections" );}// 启动连接和交易到期的监视my->start_monitors();for( auto seed_node : my->supplied_peers ) {connect( seed_node );// 连接种子节点,接入p2p网络}if(fc::get_logger_map().find(logger_name) != fc::get_logger_map().end())logger = fc::get_logger_map()[logger_name];
}

首先打开监听的端口号,设置相关协议,随后绑定端口号,然后开始监听网络中的信息,同时让本端点连接到网络中的种子节点,以此连接EOS中的p2p网络。这里面最重要的2个函数是 my->start_listen_loop()以及my->start_monitors()函数,my->start_listen_loop,通过函数名称可以断定主要用来不断地从网络中监听网络中发送的信息,my->start_monitors()应该是进行监听,但是到底监听什么,我们还不得而知。

这里暂时先不对net_plugin_impl进行具体解析,因为这并不影响我们对这两个函数的分析,另外一方面,net_plugin_impl的介绍对于这两个函数的分析,帮助不大。
因此,我们直接进入start_listen_loop函数一探究竟。

/// 该函数循环监听信息
void net_plugin_impl::start_listen_loop() {auto socket = std::make_shared<tcp::socket>( std::ref( app().get_io_service() ) );acceptor->async_accept( *socket, [socket,this]( boost::system::error_code ec ) {if( !ec ) {uint32_t visitors = 0;uint32_t from_addr = 0;auto paddr = socket->remote_endpoint(ec).address();if (ec) {fc_elog(logger,"Error getting remote endpoint: ${m}",("m", ec.message()));}else {for (auto &conn : connections) {if(conn->socket->is_open()) {if (conn->peer_addr.empty()) {visitors++;boost::system::error_code ec;if (paddr == conn->socket->remote_endpoint(ec).address()) {from_addr++;}}}}if (num_clients != visitors) {ilog("checking max client, visitors = ${v} num clients ${n}",("v",visitors)("n",num_clients));num_clients = visitors;}if( from_addr < max_nodes_per_host && (max_client_count == 0 || num_clients < max_client_count )) {++num_clients;connection_ptr c = std::make_shared<connection>( socket );connections.insert( c );start_session( c );}else {if (from_addr >= max_nodes_per_host) {fc_elog(logger, "Number of connections (${n}) from ${ra} exceeds limit",("n", from_addr+1)("ra",paddr.to_string()));}else {fc_elog(logger, "Error max_client_count ${m} exceeded",( "m", max_client_count) );}socket->close();}}} else {elog( "Error accepting connection: ${m}",( "m", ec.message() ) );// For the listed error codes below, recall start_listen_loop()switch (ec.value()) {case ECONNABORTED:case EMFILE:case ENFILE:case ENOBUFS:case ENOMEM:case EPROTO:break;default:return;}}start_listen_loop();});
}

start_listen_loop()函数中最重要的一个函数是,监听到消息之后的start_session()函数,表示监听到了新的链接,于是开始会话。start_session()函数内容如下所示。

bool net_plugin_impl::start_session(const connection_ptr& con) {boost::asio::ip::tcp::no_delay nodelay( true );boost::system::error_code ec;con->socket->set_option( nodelay, ec );if (ec) {// 如果接受数据出错, 直接关闭连接,写日志elog( "connection failed to ${peer}: ${error}",( "peer", con->peer_name())("error",ec.message()));con->connecting = false;close(con);return false;}else {// 读取数据,已经开启的session+1start_read_message( con );++started_sessions;return true;// for now, we can just use the application main loop.//     con->readloop_complete  = bf::async( [=](){ read_loop( con ); } );//     con->writeloop_complete = bf::async( [=](){ write_loop con ); } );}
}

其中重要的是start_read_message,即回话过程中读取数据,即start_read_message()函数,其具体内容如下所示。

void net_plugin_impl::start_read_message(const connection_ptr& conn) {try {if(!conn->socket) {return;}connection_wptr weak_conn = conn;std::size_t minimum_read = conn->outstanding_read_bytes ? *conn->outstanding_read_bytes : message_header_size;// 默认为false,在plugin_initialized中使用if (use_socket_read_watermark) {const size_t max_socket_read_watermark = 4096;std::size_t socket_read_watermark = std::min<std::size_t>(minimum_read, max_socket_read_watermark);boost::asio::socket_base::receive_low_watermark read_watermark_opt(socket_read_watermark);conn->socket->set_option(read_watermark_opt);}auto completion_handler = [minimum_read](boost::system::error_code ec, std::size_t bytes_transferred) -> std::size_t {if (ec || bytes_transferred >= minimum_read ) {return 0;} else {return minimum_read - bytes_transferred;}};// 从stream中异步读取固定大小的数据/* async_read(AsyncReadStream &S, const MutableBufferSequence& buffers, ReadHandler&& handler, )从*conn->socket中读取data,读到buffers中去,buffers的大小告诉系统读取多少handler是读取数据完毕之后调用的函数 */boost::asio::async_read(*conn->socket,conn->pending_message_buffer.get_buffer_sequence_for_boost_async_read(), completion_handler,[this,weak_conn]( boost::system::error_code ec, std::size_t bytes_transferred ) {auto conn = weak_conn.lock();if (!conn) {return;}conn->outstanding_read_bytes.reset();try {if( !ec ) {if (bytes_transferred > conn->pending_message_buffer.bytes_to_write()) {elog("async_read_some callback: bytes_transfered = ${bt}, buffer.bytes_to_write = ${btw}",("bt",bytes_transferred)("btw",conn->pending_message_buffer.bytes_to_write()));}EOS_ASSERT(bytes_transferred <= conn->pending_message_buffer.bytes_to_write(), plugin_exception, "");conn->pending_message_buffer.advance_write_ptr(bytes_transferred);while (conn->pending_message_buffer.bytes_to_read() > 0) {uint32_t bytes_in_buffer = conn->pending_message_buffer.bytes_to_read();if (bytes_in_buffer < message_header_size) {conn->outstanding_read_bytes.emplace(message_header_size - bytes_in_buffer);break;} else {uint32_t message_length;auto index = conn->pending_message_buffer.read_index();conn->pending_message_buffer.peek(&message_length, sizeof(message_length), index);if(message_length > def_send_buffer_size*2 || message_length == 0) {boost::system::error_code ec;elog("incoming message length unexpected (${i}), from ${p}",("i", message_length)("p",boost::lexical_cast<std::string>(conn->socket->remote_endpoint(ec))));close(conn);return;}auto total_message_bytes = message_length + message_header_size;if (bytes_in_buffer >= total_message_bytes) {conn->pending_message_buffer.advance_read_ptr(message_header_size);// 这一部分是网络通信的内容,对于其中细节不甚了解// 接收的数据传递到pending_message_buffer中,// process_next_message中进行处理从pending_message_buffer中处理数据if (!conn->process_next_message(*this, message_length)) {return;}} else {auto outstanding_message_bytes = total_message_bytes - bytes_in_buffer;auto available_buffer_bytes = conn->pending_message_buffer.bytes_to_write();if (outstanding_message_bytes > available_buffer_bytes) {conn->pending_message_buffer.add_space( outstanding_message_bytes - available_buffer_bytes );}conn->outstanding_read_bytes.emplace(outstanding_message_bytes);break;}}}start_read_message(conn);} else {auto pname = conn->peer_name();if (ec.value() != boost::asio::error::eof) {elog( "Error reading message from ${p}: ${m}",("p",pname)( "m", ec.message() ) );} else {ilog( "Peer ${p} closed connection",("p",pname) );}close( conn );}}catch(const std::exception &ex) {string pname = conn ? conn->peer_name() : "no connection name";elog("Exception in handling read data from ${p} ${s}",("p",pname)("s",ex.what()));close( conn );}catch(const fc::exception &ex) {string pname = conn ? conn->peer_name() : "no connection name";elog("Exception in handling read data ${s}", ("p",pname)("s",ex.to_string()));close( conn );}catch (...) {string pname = conn ? conn->peer_name() : "no connection name";elog( "Undefined exception hanlding the read data from connection ${p}",( "p",pname));close( conn );}} );} catch (...) {string pname = conn ? conn->peer_name() : "no connection name";elog( "Undefined exception handling reading ${p}",("p",pname) );close( conn );}
}

抛去其中try catch语句,我们重点看到conn->process_next_message函数,这个函数正如注释所说,从接受到的pending_message_buffer中处理数据,我们继续追踪这个处理函数,如下所示。

// 该函数用于数据同步,使用中心消息处理系统处理数据
bool connection::process_next_message(net_plugin_impl& impl, uint32_t message_length) {try {auto ds = pending_message_buffer.create_datastream();net_message msg;fc::raw::unpack(ds, msg);  // 将解压的ds信息放入msg中msg_handler m(impl, shared_from_this() );// 判断msg的类型,msg可以是带签名的区块或者打包后的交易// 如果是区块,获取signed_block后放入m中// 如果是trx,则获取packed_trx后放入m中if( msg.contains<signed_block>() ) {m( std::move( msg.get<signed_block>() ) );} else if( msg.contains<packed_transaction>() ) {m( std::move( msg.get<packed_transaction>() ) );} else {msg.visit( m );}} catch(  const fc::exception& e ) {edump((e.to_detail_string() ));impl.close( shared_from_this() );return false;}return true;
}

其中重要的是msg_handler类型,这里面分别针对消息不同的类型做不同的处理,首先查看针对如果msg中包含了signed_block,则进入处理block函数;如果msg中包含packed_transaction,则进入处理trx的函数。首先查看msg_handler是如何处理signed_block的,处理区块的函数如下。收到区块之后验证区块,验证通过则接受区块,验证失败则拒绝区块。

   // 如果从网络中收到一个区块,执行相应的处理void net_plugin_impl::handle_message(const connection_ptr& c, const signed_block_ptr& msg) {controller &cc = chain_plug->chain();block_id_type blk_id = msg->id();uint32_t blk_num = msg->block_num();fc_dlog(logger, "canceling wait on ${p}", ("p",c->peer_name()));c->cancel_wait();try {if( cc.fetch_block_by_id(blk_id)) {// recv_block函数具体含义为止,同步区块数据?不应该是检查在先?sync_master->recv_block(c, blk_id, blk_num);return;}} catch( ...) {// should this even be caught?elog("Caught an unknown exception trying to recall blockID");}dispatcher->recv_block(c, blk_id, blk_num);  // 进行报告,我已经从连接c收到一个编号blk_id和blk_num的区块?fc::microseconds age( fc::time_point::now() - msg->timestamp);peer_ilog(c, "received signed_block : #${n} block age in secs = ${age}",("n",blk_num)("age",age.to_seconds()));go_away_reason reason = fatal_other;try {// 在chain_plug中再检查其具体含义,看catch中的内容,应该是使用chain_plug对区块进行检查// 如果检查无错误,reason的值应该是no_reasonchain_plug->accept_block(msg); //, sync_master->is_active(c));reason = no_reason;} catch( const unlinkable_block_exception &ex) {peer_elog(c, "bad signed_block : ${m}", ("m",ex.what()));reason = unlinkable;} catch( const block_validate_exception &ex) {peer_elog(c, "bad signed_block : ${m}", ("m",ex.what()));elog( "block_validate_exception accept block #${n} syncing from ${p}",("n",blk_num)("p",c->peer_name()));reason = validation;} catch( const assert_exception &ex) {peer_elog(c, "bad signed_block : ${m}", ("m",ex.what()));elog( "unable to accept block on assert exception ${n} from ${p}",("n",ex.to_string())("p",c->peer_name()));} catch( const fc::exception &ex) {peer_elog(c, "bad signed_block : ${m}", ("m",ex.what()));elog( "accept_block threw a non-assert exception ${x} from ${p}",( "x",ex.to_string())("p",c->peer_name()));reason = no_reason;} catch( ...) {peer_elog(c, "bad signed_block : unknown exception");elog( "handle sync block caught something else from ${p}",("num",blk_num)("p",c->peer_name()));}update_block_num ubn(blk_num);if( reason == no_reason ) {for (const auto &recpt : msg->transactions) {auto id = (recpt.trx.which() == 0) ? recpt.trx.get<transaction_id_type>() : recpt.trx.get<packed_transaction>().id();auto ltx = local_txns.get<by_id>().find(id);if( ltx != local_txns.end()) {local_txns.modify( ltx, ubn );}auto ctx = c->trx_state.get<by_id>().find(id);if( ctx != c->trx_state.end()) {c->trx_state.modify( ctx, ubn );}}// 这里再次进行recv_block,不明白其含义sync_master->recv_block(c, blk_id, blk_num);}else {// 验证区块过程中出现错误,直接拒绝这个区块sync_master->rejected_block(c, blk_num);}}

对交易的验证与对区块的处理方式相同,验证交易,如果本地存在此交易,则丢弃,否则开始验证交易,若验证通过则接受交易并广播交易,否则拒绝交易。

void net_plugin_impl::handle_message(const connection_ptr& c, const packed_transaction_ptr& trx) {fc_dlog(logger, "got a packed transaction, cancel wait");peer_ilog(c, "received packed_transaction");controller& cc = my_impl->chain_plug->chain();if( cc.get_read_mode() == eosio::db_read_mode::READ_ONLY ) {fc_dlog(logger, "got a txn in read-only mode - dropping");return;}if( sync_master->is_active(c) ) {fc_dlog(logger, "got a txn during sync - dropping");return;}auto ptrx = std::make_shared<transaction_metadata>( trx );const auto& tid = ptrx->id;c->cancel_wait();if(local_txns.get<by_id>().find(tid) != local_txns.end()) {fc_dlog(logger, "got a duplicate transaction - dropping");return;}dispatcher->recv_transaction(c, tid);// 验证交易chain_plug->accept_transaction(ptrx, [c, this, ptrx](const static_variant<fc::exception_ptr, transaction_trace_ptr>& result) {if (result.contains<fc::exception_ptr>()) {peer_dlog(c, "bad packed_transaction : ${m}", ("m",result.get<fc::exception_ptr>()->what()));} else {auto trace = result.get<transaction_trace_ptr>();if (!trace->except) {fc_dlog(logger, "chain accepted transaction");// 广播交易this->dispatcher->bcast_transaction(ptrx);return;}peer_elog(c, "bad packed_transaction : ${m}", ("m",trace->except->what()));}// 拒绝交易dispatcher->rejected_transaction(ptrx->id);});
}

至此,net_plugin以及net_plugin_impl插件中的比较重要的函数已经分析完毕,由于其中细节错综复杂,因此在分析过程中抓住主要脉络进行分析,对于其他细节内容并没有深究。

EOS中plugin之net_plugin相关推荐

  1. CRM中Plugin开发如何将功能放入多个模块

    近期做CRM的Plugin开发,发现Plugin中的功能必须全部放在一个DLL里,感觉不爽,如果我要用的功能在别人提供的DLL里,或有些功能需要在多个地方使用岂不是很难过? 用了VS2012的Dyna ...

  2. EOS中的CPU、NET和RAM是什么?

    不管是开发还是投资或者纯科普扫盲,刚接触到柚子EOS区块链的时候都会先遇到CPU.NET和RAM的概念,他们到底是什么,跟我们平时了解到的计算机中的概念有什么关系? EOS介绍 EOS是一种区块链架构 ...

  3. eos源码赏析(二十三):默克尔树在EOS中的应用(上)

    前面文章中在分析push_transactioneos源码赏析(二十):EOS智能合约之push_transaction的天龙八"步"以及区块签名eos源码赏析(二十一):EOS智 ...

  4. EOS中JAVA从Linux下载文件,教程 - 在Linux上安装EOS

    你想学习如何在Linux上安装EOS? 在本教程中,我们将向您展示如何在运行Ubuntu Linux的计算机上安装EOS. EOS相关教程: 在此页面上,我们提供了与EOS.IO相关的教程列表的快速访 ...

  5. AS 中 Plugin for Gradle 和 Gradle 之间的版本对应关系

    Plugin for Gradle 和 Gradle 之间的版本对应关系  来源:https://developer.android.com/studio/releases/gradle-plugin ...

  6. 普元EOS中, 子系统和portal不在同一个域中,使用jquery的jsonp来解决portal跨域访问

    转至元数据起始 [背景] 子系统和portal不在同一个域中且项目中要求不能使用nginx.apache等反向代理软件,故使用jsonp从代码角度解决ajax跨域问题 [实现思路] 通过jquery的 ...

  7. EOS中如何实现导入导出excel文件

    阅读原文 导入导出excel文件 场景描述 将数据库表中的数据按照定义的EXCEL模板进行导出,下载到本地: 将EXCEL中的数据导入数据库相应的表中. 场景目标 通过本场景,解决EXCEL的导入导出 ...

  8. codeblocks中plugin的实现

    快乐虾 http://blog.csdn.net/lights_joy/ lights@hb165.com 本文适用于 codeblocks-8.02 vs2005 欢迎转载,但请保留作者信息 1.1 ...

  9. Qt文档阅读笔记-Qt4 Lower-Level API扩展Qt Applications(Qt4中Plugin的使用)解析与实例

    目录 官方解析 博主栗子 官方解析 Qt应用程序可以对插件进行扩展,要使用QPluginLoader这个类进行加载.插件可以提供任意的功能,而且不限制数据库驱动,图像格式,以及其他的Qt功能插件. 当 ...

最新文章

  1. DLINQ *.dbml文件该属于哪一层
  2. [翻译] Shimmer
  3. 200804C阶段一变量生存期和结构体
  4. SpringBoot @ConfigurationProperties详解
  5. SAP CRM和SAP Hybris的订单修改记录
  6. Delphi 的字符及字符串[4] - 字符串、字符指针与字符数组
  7. lucene学习笔记_学习Lucene
  8. 求解ax + by = c 这类方程
  9. C++ Primer笔记整理
  10. 服务器压力测试怎么做_做手游怎么选服务器?
  11. 自动变量和开辟内存的生存期和作用域探讨
  12. 力扣836.矩形重叠
  13. BZOJ3270 博物馆(高斯消元+概率期望)
  14. 卧槽!“饿了么”把“饿了吗”告了,网友:判决结果没想到~
  15. iframe页面使用Js实现父页面和子页面通信
  16. 金格iweboffice axios上传单文件和多文件js代码
  17. UML用例图怎么画 有手就会
  18. python汇率转换
  19. 文本编辑器Geany
  20. maven仓库类型说明 hosted/proxy/group

热门文章

  1. 刷脸支付真的安全吗?
  2. Android4.1 触摸屏(TP)划线曲折、不圆润的解决方法
  3. python使用shutil copyfile 复制文件
  4. ubuntu篇---系统下查看CPU和GPU温度
  5. 24【备忘录设计模式】
  6. 拓嘉辰丰:拼多多店铺权重与转化有关系吗
  7. 游戏配音教程分享给你
  8. hdu1852 Beijing 2008(约数之和 : 无逆元除法取模 | 等比数列分治求和)
  9. ajax应用设计模式,Ajax设计模式下Web开发的研究与应用
  10. /u 反斜杠u 编码总结