原文地址:http://www.php-master.com/post/342621.html

序言

本文只是结合GatewayWorker和Workerman的官方文档和源码,深入了解执行过程。以便更深入的了解并使用

GatewayWorker基于Workerman开发的一个项目框架。Register进程负责保存Gateway进程和BusinessWorker进程的地址,建立两者的连接。Gateway进程负责维持客户端连接,并转发客户端的数据给BusinessWorker进程处理,BusinessWorker进程负责处理实际的业务逻辑(默认调用Events.php处理业务),并将结果推送给对应的客户端。Register、Gateway、BusinessWorker进程都是继承Worker类实现各自的功能,所以了解GatewayWorker框架的内部执行过程,需要优先理解Worker的内容

GatewayWorker目录结构

├── Applications // 这里是所有开发者应用项目
│   └── YourApp  // 其中一个项目目录,目录名可以自定义
│       ├── Events.php // 开发者只需要关注这个文件
│       ├── start_gateway.php // gateway进程启动脚本,包括端口        号等设置
│       ├── start_businessworker.php // businessWorker进程启动  脚本
│       └── start_register.php // 注册服务启动脚本
│
├── start.php // 全局启动脚本,此脚本会依次加载Applications/项目/start_*.php启动脚本
│
└── vendor    // GatewayWorker框架和Workerman框架源码目  录,此目录开发者不用关心

start.php 为启动脚本,在该脚本中,统一加载start_gateway.php start_businessworker.php start_register.php进程脚本,最后通过Worker::runAll();运行所有服务。

工作原理

1、Register、Gateway、BusinessWorker进程启动
2、Gateway、BusinessWorker进程启动后向Register服务进程发起长连接注册自己
3、Register服务收到Gateway的注册后,把所有Gateway的通讯地址保存在内存中
4、Register服务收到BusinessWorker的注册后,把内存中所有的Gateway的通讯地址发给BusinessWorker
5、BusinessWorker进程得到所有的Gateway内部通讯地址后尝试连接Gateway
6、如果运行过程中有新的Gateway服务注册到Register(一般是分布式部署加机器),则将新的Gateway内部通讯地址列表将广播给所有BusinessWorker,BusinessWorker收到后建立连接
7 、如果有Gateway下线,则Register服务会收到通知,会将对应的内部通讯地址删除,然后广播新的内部通讯地址列表给所有BusinessWorker,BusinessWorker不再连接下线的Gateway
8、至此Gateway与BusinessWorker通过Register已经建立起长连接
9、客户端的事件及数据全部由Gateway转发给BusinessWorker处理,BusinessWorker默认调用Events.php中的onConnect onMessage onClose处理业务逻辑。
10、BusinessWorker的业务逻辑入口全部在Events.php中,包括onWorkerStart进程启动事件(进程事件)、onConnect连接事件(客户端事件)、onMessage消息事件(客户端事件)、onClose连接关闭事件(客户端事件)、onWorkerStop进程退出事件(进程事件)

1 Register、Gateway、BusinessWorker进程启动

项目根目录下的start.php 为启动脚本,在该脚本中,加载start_gateway.php start_businessworker.php start_register.php进程脚本,完成各个服务的Worker初始化:

// 加载所有Applications/*/start.php,以便启动所有服务
foreach(glob(__DIR__.'/Applications/*/start*.php') as $start_file)
{require_once $start_file;
}

最后通过Worker::runAll();运行所有服务。

// 运行所有服务
Worker::runAll();

运行所有服务,先看一遍runAll()方法的执行内容

public static function runAll()
{// 检查运行环境self::checkSapiEnv();//初始化环境变量self::init();// 解析命令self::parseCommand();// 尝试以守护进程模式运行self::daemonize();// 初始化所有worker实例,主要是监听端口self::initWorkers();// 初始化所有信号处理函数self::installSignal();// 保存主进程pidself::saveMasterPid();// 展示启动界面self::displayUI();// 创建子进程(worker进程),然后给每个子进程绑定loop循环监听事件tcpself::forkWorkers();// 尝试重定向标准输入输出self::resetStd();// 监控所有子进程(worker进程)self::monitorWorkers();
}

self::init()初始化环境变量中,有以下部分代码,保存$_idMap从PID映射到工作进程ID

// Init data for worker id.self::initId();
protected static function initId(){foreach (self::$_workers as $worker_id => $worker) {$new_id_map = array();for($key = 0; $key < $worker->count; $key++) {$new_id_map[$key] = isset(self::$_idMap[$worker_id]      [$key]) ? self::$_idMap[$worker_id][$key] : 0;}self::$_idMap[$worker_id] = $new_id_map;}}

self::forkWorkers()方法通过循环self::$_workers数组,fork各自worker的count数量的进程。然后通过调用

$worker->run();

运行当前worker实例,在run方法中通过

  if (!self::$globalEvent) {$event_loop_class = self::getEventLoopName();self::$globalEvent = new $event_loop_class;// Register a listener to be notified when server socket is ready to read.if ($this->_socketName) {if ($this->transport !== 'udp') {self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,array($this, 'acceptConnection'));} else {self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,array($this, 'acceptUdpConnection'));}}

获取一个当前可用的事件轮询方式,然后根据当前的协议类型添加一个监听到事件轮询中
然后,尝试出发当前进程模型的onWorkerStart回调,此回调会在Gateway类以及BusinessWorker类中都会定义,代码

 if ($this->onWorkerStart) {try {call_user_func($this->onWorkerStart, $this);} catch (\Exception $e) {self::log($e);// Avoid rapid infinite loop exit.sleep(1);exit(250);} catch (\Error $e) {self::log($e);// Avoid rapid infinite loop exit.sleep(1);exit(250);}}

最后,执行事件的循环等待socket事件,处理读写等操作,代码

 // Main loop.self::$globalEvent->loop();

以上是runAll()方法的部分内容,会在了解GatewayWorker的工作原理的时候用到

2.1 Gateway进程向Register服务进程发起长连接注册自己

初始化Gateway

$gateway = new Gateway("text://0.0.0.0:8383");

在Gateway类中重写run方法,当调用runAll()方法启动进程时,fork进程之后,运行worker实例的时候,会调用到此重写的run方法

public function run()
{// 保存用户的回调,当对应的事件发生时触发$this->_onWorkerStart = $this->onWorkerStart;$this->onWorkerStart  = array($this, 'onWorkerStart');// 保存用户的回调,当对应的事件发生时触发$this->_onConnect = $this->onConnect;$this->onConnect  = array($this, 'onClientConnect');// onMessage禁止用户设置回调$this->onMessage = array($this, 'onClientMessage');// 保存用户的回调,当对应的事件发生时触发$this->_onClose = $this->onClose;$this->onClose  = array($this, 'onClientClose');// 保存用户的回调,当对应的事件发生时触发$this->_onWorkerStop = $this->onWorkerStop;$this->onWorkerStop  = array($this, 'onWorkerStop');$this->_startTime = time();// 运行父方法parent::run();
}

定义了$this->onWorkerStart回调,

$this->onWorkerStart  = array($this, 'onWorkerStart');

执行到Worker类中的run()方法时,被触发。即,上边提到的

Worker脚本中的run方法

调用Gateway类中的onWorkerStart方法,代码

public function onWorkerStart()
{$this->lanPort = $this->startPort + $this->id;if ($this->pingInterval > 0) {$timer_interval = $this->pingNotResponseLimit > 0 ? $this->pingInterval / 2 : $this->pingInterval;Timer::add($timer_interval, array($this, 'ping'));}if ($this->lanIp !== '127.0.0.1') {Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, array($this, 'pingBusinessWorker'));}if (strpos($this->registerAddress, '127.0.0.1') !== 0) {Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, array($this, 'pingRegister'));}if (!class_exists('\Protocols\GatewayProtocol')) {class_alias('GatewayWorker\Protocols\GatewayProtocol', 'Protocols\GatewayProtocol');}// 初始化 gateway 内部的监听,用于监听 worker 的连接已经连接上发来的数据$this->_innerTcpWorker = new Worker("GatewayProtocol://{$this->lanIp}:{$this->lanPort}");$this->_innerTcpWorker->listen();// 重新设置自动加载根目录Autoloader::setRootPath($this->_autoloadRootPath);// 设置内部监听的相关回调$this->_innerTcpWorker->onMessage = array($this, 'onWorkerMessage');$this->_innerTcpWorker->onConnect = array($this, 'onWorkerConnect');$this->_innerTcpWorker->onClose   = array($this, 'onWorkerClose');// 注册 gateway 的内部通讯地址,worker 去连这个地址,以便 gateway 与 worker 之间建立起 TCP 长连接$this->registerAddress();if ($this->_onWorkerStart) {call_user_func($this->_onWorkerStart, $this);}
}

$this->startPort : 内部通讯起始端口,假如$gateway->count=4,起始端口为4000,可在gateway启动脚本中自定义
$this->id : 基于worker实例分配的进程编号,当前从0开始,根据count自增。在fork进程的时候生成

Worker.php

$this->_innerTcpWorker:用于监听 worker 的连接已经连接上发来的数据。在工作原理5中,BusinessWorker进程得到所有的Gateway内部通讯地址后尝试连接Gateway以及其他两者之间的通信(连接,消息,关闭)会被调用
$this->registerAddress(): 代码中$this->registerAddress是在start_gateway.php初始化Gateway类之后定义的。该端口是Register进程所监听。此处异步的向Register进程发送数据,存储当前 Gateway 的内部通信地址

public function registerAddress()
{$address                   = $this->lanIp . ':' . $this->lanPort;$this->_registerConnection = new AsyncTcpConnection("text://{$this->registerAddress}");$this->_registerConnection->send('{"event":"gateway_connect", "address":"' . $address . '", "secret_key":"' . $this->secretKey . '"}');$this->_registerConnection->onClose = array($this, 'onRegisterConnectionClose');$this->_registerConnection->connect();
}

$this->lanIp: Gateway所在服务器的内网IP

2.2 BusinessWorker进程向Register服务进程发起长连接注册自己

BusinessWorker类中同样重写run方法,定义了$this->onWorkerStart

 public function run(){$this->_onWorkerStart  = $this->onWorkerStart;$this->_onWorkerReload = $this->onWorkerReload;$this->_onWorkerStop = $this->onWorkerStop;$this->onWorkerStop   = array($this, 'onWorkerStop');$this->onWorkerStart   = array($this, 'onWorkerStart');$this->onWorkerReload  = array($this, 'onWorkerReload');parent::run();}

执行Worker类中的run方法,触发BusinessWorker中的onWorkerStart

protected function onWorkerStart()
{if (!class_exists('\Protocols\GatewayProtocol')) {class_alias('GatewayWorker\Protocols\GatewayProtocol', 'Protocols\GatewayProtocol');}$this->connectToRegister();\GatewayWorker\Lib\Gateway::setBusinessWorker($this);\GatewayWorker\Lib\Gateway::$secretKey = $this->secretKey;if ($this->_onWorkerStart) {call_user_func($this->_onWorkerStart, $this);}if (is_callable($this->eventHandler . '::onWorkerStart')) {call_user_func($this->eventHandler . '::onWorkerStart', $this);}if (function_exists('pcntl_signal')) {// 业务超时信号处理pcntl_signal(SIGALRM, array($this, 'timeoutHandler'), false);} else {$this->processTimeout = 0;}// 设置回调if (is_callable($this->eventHandler . '::onConnect')) {$this->_eventOnConnect = $this->eventHandler . '::onConnect';}if (is_callable($this->eventHandler . '::onMessage')) {$this->_eventOnMessage = $this->eventHandler . '::onMessage';} else {echo "Waring: {$this->eventHandler}::onMessage is not callable\n";}if (is_callable($this->eventHandler . '::onClose')) {$this->_eventOnClose = $this->eventHandler . '::onClose';}// 如果Register服务器不在本地服务器,则需要保持心跳if (strpos($this->registerAddress, '127.0.0.1') !== 0) {Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, array($this, 'pingRegister'));}
}

通过connectToRegister方法,发送数据到Register进程,连接服务注册中心

public function connectToRegister()
{$this->_registerConnection = new AsyncTcpConnection("text://{$this->registerAddress}");$this->_registerConnection->send('{"event":"worker_connect","secret_key":"' . $this->secretKey . '"}');$this->_registerConnection->onClose   = array($this, 'onRegisterConnectionClose');$this->_registerConnection->onMessage = array($this, 'onRegisterConnectionMessage');$this->_registerConnection->connect();
}

3 Register服务收到Gateway的注册后,把所有的Gateway的通讯地址保存在内存中

在Register类中,重写了run方法,定义了当前的

     $this->onConnect = array($this, 'onConnect');// 设置 onMessage 回调$this->onMessage = array($this, 'onMessage');// 设置 onClose 回调$this->onClose = array($this, 'onClose');

三个属性,当Register启动的进程收到消息时,会触发onMessage方法

 public function onMessage($connection, $buffer)
{// 删除定时器Timer::del($connection->timeout_timerid);$data       = @json_decode($buffer, true);if (empty($data['event'])) {$error = "Bad request for Register service. Request info(IP:".$connection->getRemoteIp().", Request Buffer:$buffer). See http://wiki.workerman.net/Error4 for detail";Worker::log($error);return $connection->close($error);}$event      = $data['event'];$secret_key = isset($data['secret_key']) ? $data['secret_key'] : '';// 开始验证switch ($event) {// 是 gateway 连接case 'gateway_connect':if (empty($data['address'])) {echo "address not found\n";return $connection->close();}if ($secret_key !== $this->secretKey) {Worker::log("Register: Key does not match ".var_export($secret_key, true)." !== ".var_export($this->secretKey, true));return $connection->close();}$this->_gatewayConnections[$connection->id] = $data['address'];$this->broadcastAddresses();break;// 是 worker 连接case 'worker_connect':if ($secret_key !== $this->secretKey) {Worker::log("Register: Key does not match ".var_export($secret_key, true)." !== ".var_export($this->secretKey, true));return $connection->close();}$this->_workerConnections[$connection->id] = $connection;$this->broadcastAddresses($connection);break;case 'ping':break;default:Worker::log("Register unknown event:$event IP: ".$connection->getRemoteIp()." Buffer:$buffer. See http://wiki.workerman.net/Error4 for detail");$connection->close();}
}

当$event = ‘gateway_connect’时,是Gateway发来的注册消息,保存到$this->_gatewayConnections数组中,在通过broadcastAddresses方法将当前$this->_gatewayConnections中所有的Gatewat通讯地址转发给所有BusinessWorker进程

4 Register服务收到BusinessWorker的注册后,把内存中所有的Gateway的通讯地址发给BusinessWorker

同第3步中,Register类收到BusinessWorker的注册时,会触发onMessage方法中的worker_connect,case选项。

image.png

同时,将当前worker连接加入到$_workerConnections数组中,在通过broadcastAddresses方法将当前$this->_gatewayConnections中所有的Gatewat通讯地址转发给所有BusinessWorker进程。

5 BusinessWorker进程得到所有的Gateway内部通讯地址后尝试连接Gateway

在BusinessWoker类的启动中,通过重写run方法,定义的启动onWorkerStart方法中,通过connectToRegister方法注册服务中心的同时,也定义了onMessage匿名函数,用于接收消息回调。

$this->_registerConnection->onMessage = array($this, 'onRegisterConnectionMessage');

即,当注册中心发来消息时候,回调到此处

 public function onRegisterConnectionMessage($register_connection, $data)
{$data = json_decode($data, true);if (!isset($data['event'])) {echo "Received bad data from Register\n";return;}$event = $data['event'];switch ($event) {case 'broadcast_addresses':if (!is_array($data['addresses'])) {echo "Received bad data from Register. Addresses empty\n";return;}$addresses               = $data['addresses'];$this->_gatewayAddresses = array();foreach ($addresses as $addr) {$this->_gatewayAddresses[$addr] = $addr;}$this->checkGatewayConnections($addresses);break;default:echo "Receive bad event:$event from Register.\n";}
}

其中Register类发来的数据是

$data   = array('event'     => 'broadcast_addresses','addresses' => array_unique(array_values($this->_gatewayConnections)),);

这个时候,就会通过checkGatewayConnections方法检查gateway的这些通信端口是否都已经连接,在通过tryToConnectGateway方法尝试连接gateway的这些内部通信地址

6 Gateway进程收到BusinessWorker进程的连接消息

同样,在Gateway进程启动的时候,触发的onWorkerStart方法中,也定义了一个内部通讯的onWorkerMessage

$this->_innerTcpWorker->onMessage = array($this, 'onWorkerMessage');

由此来接收BusinessWorker进程发来的连接消息,部分代码

public function onWorkerMessage($connection, $data)
{$cmd = $data['cmd'];if (empty($connection->authorized) && $cmd !== GatewayProtocol::CMD_WORKER_CONNECT && $cmd !== GatewayProtocol::CMD_GATEWAY_CLIENT_CONNECT) {self::log("Unauthorized request from " . $connection->getRemoteIp() . ":" . $connection->getRemotePort());return $connection->close();}switch ($cmd) {// BusinessWorker连接Gatewaycase GatewayProtocol::CMD_WORKER_CONNECT:$worker_info = json_decode($data['body'], true);if ($worker_info['secret_key'] !== $this->secretKey) {self::log("Gateway: Worker key does not match ".var_export($this->secretKey, true)." !== ". var_export($this->secretKey));return $connection->close();}$key = $connection->getRemoteIp() . ':' . $worker_info['worker_key'];// 在一台服务器上businessWorker->name不能相同if (isset($this->_workerConnections[$key])) {self::log("Gateway: Worker->name conflict. Key:{$key}");$connection->close();return;}$connection->key = $key;$this->_workerConnections[$key] = $connection;$connection->authorized = true;return;// GatewayClient连接Gateway

将worker的进程连接保存到$this->_workerConnections[$key] = $connection;

7 Gateway进程收到客户端的连接,消息时,会通过Gateway转发给worker处理

 // Gateway类的run方法中定义此属性$this->onMessage = array($this, 'onClientMessage');// 收到客户端消息的时候出发此函数public function onClientMessage($connection, $data){$connection->pingNotResponseCount = -1;$this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $connection, $data);}

在sendToWorker方法中,将数据发给worker进程处理

深入理解GatewayWorker框架相关推荐

  1. 使用GatewayWorker框架,多个workerman进程(businessworker)负载不均衡的问题解决过程

    公司搭建一套智慧社区.智能对讲管控云平台时,使用GatewayWorker框架搭建app端外推送的服务.发现性能比预期的低.也就是GatewayWorker(https://github.com/wa ...

  2. 从Java的角度理解前端框架,nodejs,reactjs,angularjs,requirejs,seajs

    从Java的角度理解前端框架,nodejs,reactjs,angularjs,requirejs,seajs [前端神秘的面纱] 对后端开发来说,前端是神秘的, 眼花缭乱的技术,繁多的框架, 如果你 ...

  3. 深入理解 Flutter 框架层次结构

    作者: Frederik Schweiger 链接 : The Layer Cake Flutter 是一个非常优秀的跨平台开发框架,基于 Flutter 我们可以用很少的代码快速的开发出界面精美的 ...

  4. Understanding the Framework (理解FMS框架)

    FMS Framework 由 600 行没有文档的代码组成,毫无疑问很多开发者会开发自己的框架. 在本章,load几个.asc 文件,就在application中使用load()命令.比如: mai ...

  5. Windows 8实例教程系列 - 理解应用框架

    Windows 操作系统之所以风靡世界,是因为其"易学易用",从用户的角度出发,让数以万计的非IT人员使用计算机实现娱乐,工作等目的.Windows 8继承Windows桌面的优点 ...

  6. spring框架_一篇文章带你理解Spring框架

    虽然现在流行用SpringBoot了,很多配置已经简化和封装了,但是对于Spring的一些基础我们了解一些是对我们自己的架构思想很有帮助的!接下来和笔者一起来探讨一下Spring框架吧! 1.什么是S ...

  7. aop实现原理_从宏观的实现原理和设计本质入手,带你理解 AOP 框架的原理

    点击上方"Java知音",选择"置顶公众号" 技术文章第一时间送达! 作者:FeelsChaotic juejin.im/post/5c57b2d5e51d45 ...

  8. php 注入是什么意思,如何理解ThinkPHP框架里的依赖注入?

    依赖注入,你可以分开理解.拆成,依赖和注入 依赖:就是你现在要用 request 对象的get 方法,所有你必须要先能得到request对象,然后才能使用这个方法.这个就是依赖. 注入:怎么得到req ...

  9. Spring深入理解-Spring框架设计理念

    导语   Spring框架作为企业中最常用的框架,是为了降低企业级开发的复杂性,现在经过发展它可以做的事情也是越来越多了.但是尽管Spring家族的东西已经越来越多,越来越健全,但是它的核心理念是不变 ...

  10. 从宏观的实现原理和设计本质入手,带你理解 AOP 框架的原理

    点击上方"Java知音",选择"置顶公众号" 技术文章第一时间送达! 作者:FeelsChaotic juejin.im/post/5c57b2d5e51d45 ...

最新文章

  1. javacript实现不被浏览器拦截打开新窗口
  2. windows下使用curl以及常用curl命令
  3. macbook配置java环境变量_Mac系统配置JDK环境变量
  4. 在加利福尼亚州投资于新餐馆:一种数据驱动的方法
  5. C++ 基类和派生类的析构函数
  6. 痞子衡嵌入式:飞思卡尔i.MX RTyyyy系列MCU特性介绍(3)- 命名规则
  7. HttpServletRequest应用-获取请求参数及解决中文乱码
  8. matlab算sma,MA-EMA-SMA-DMA 函数用法、算法、图例、区别
  9. Win10笔记本电脑如何开启热点
  10. 解决Tomcat version 7.0 only supports J2EE 1.2, 1.3, 1.4, and Java EE 5 and 6 Web modules
  11. [JZOJ100026]图--倍增
  12. 22. Magento 创建新闻模块(3)
  13. c语言定义int变量 故意输入字母,怎样用C语言输入一段文本,以什么作为输入终止的标志?又怎样才能按输入的格式输出该文本?...
  14. html ul动态添加li,javaScript动态添加Li元素
  15. 论Python常见的内置模块
  16. 程序员的这些前五大无奈,你占了多少?
  17. HTML——3D移动、3D透视、3D效果、3D呈现案例效果
  18. SVG格式文件插入Word/WPS,三种简单快捷的方法,实现图片高清无损
  19. javascript显示本地服务器图片,JavaScript图片本地预览功能的实现方法
  20. Socket网络编程精讲

热门文章

  1. dgen模拟器 linux,怎么样在Linux/FreeBSD下玩模拟器游戏
  2. Mac Spotlight 聚焦搜索
  3. 边境的悍匪—机器学习实战:第十三章 使用TensorFlow加载和预处理数据
  4. 阿里云服务器安全组放行宝塔端口8888|888|80|443|20|21教程
  5. Word中如何修改脚注的编号方式
  6. 英魂之刃服务器8.8维护,英魂之刃8大联动!每一个都是良心之作!玩家:公开打脸同行?...
  7. 曾被尊称为“教父级”人物的郭盛华,现在到底怎么样了?
  8. TP5.1升级到ThinkPHP6.0的实战教程看云版
  9. 让机器耳濡目染:MIT提出跨模态机器学习模型
  10. 解决RabbitMQ无法使用guest用户登录问题