egg-cluster 是什么

为了将多核 CPU 的性能发挥到极致,最大程度地榨干服务器资源,egg 采用多进程模型,解决了一个 Node.js 进程只能运行在一个 CPU 上的问题,egg-cluster 是用于 egg 多进程管理的基础模块,负责底层的 IPC 通道的建立以及处理各进程的通信

Egg 多进程模型

  • master 主进程
  • worker master 的子进程,一般是根据服务器有多少个 CPU 启动多少个这样的 worker 进程,主要用于对外服务,处理各种业务层面的事情
  • agent master 的子进程,主要处理公共资源的访问,如文件监听,或者帮 worker 处理一些公共事务,如一些事情是不需要每个 worker 都做一次的,agent 帮忙做完之后通知它们执行之后的操作

master 类似于一个守护进程的存在:

  • 负责 agent 的启动、退出、重启
  • 负责各个 worker 进程的启动、退出、以及 refork ,在开发模式下负责重启
  • 负责 agent 和各个 worker 之间的通信
  • 负责各个 worker 之间的通信

各进程的启动顺序:

  • master 启动后先启动 agent 进程
  • agent 初始化成功后,通过 IPC 通道通知 master
  • master 根据 CPU 的个数启动相同数目的 worker 进程
  • worker 进程初始化成功后,通过 IPC 通道通知 master
  • 所有的进程初始化成功后,master 通知 agent 和各个 worker 进程应用启动成功

启动方式差异:

从上图可以看出,master 启动 agent 和 worker 的方式明显不一样,启动 agent 使用的是 child_process 的 fork 模式,启动各个 worker 使用的是 cluster 的 fork 模式,为什么不能都使用同一种方式来启动?因为它们所负责处理的事情性质是不一样的,agent 是类似于作为各个 worker 秘书的存在,只负责帮它们处理轻量级的服务,是不直接对外提供 http 访问的,所以 master 用 cluster.fork 把各个 worker 启动起来,并提供对外 http 访问,这些 worker 在 cluster 的预处理下能够对同一端口进行监听而不会产生端口冲突,同时使用 round-robin 策略进行负载均衡把收到的 http 请求合理地分配给各个 worker 进行处理

进程间通信:

master 和 agent/worker 是 real communication,agent 和 worker 之间以及各个 worker 之间是 virtual communication

  • master 继承了 events 模块,拥有 events 监听、发送消息的能力,master 进程自身是通过订阅者模式来进行事务处理的,所以在 master 的源码里面并没有看到过多的 callback hell
  • master 是 agent 的父进程,相互可以通过 IPC 通道进行通信
  • master 是 worker 的父进程,相互可以通过 IPC 通道进行通信
  • agent 和各个 worker 之间毕竟是不同进程,是无法直接进行通信的,所以需要借助master 的力量进行转发,egg-cluster 封装了一个 messenger 的工具类,对各个进程间消息转发进行了封装
  • 各个 worker 之间由于是不同进程,也是无法直接进行通信的,需要借助 master 的力量进行转发,原理同上

各进程的状态通知:

  • worker 启动成功后 master 会对其状态进行监听,可以第一时间感知到失联的 worker,在这情况下 master 会对这些 worker 之前所绑定的事件进行销毁防止内存泄露,并且通知 agent,最后 refork 出同等数量的 worker 保证业务的顺利进行,对 worker 的 fork 和 refork 操作都是通过工具类 cfork 进行的
  • agent 启动成功后 master 会对其状态进行监听,对于退出或者失联的 agent master 是清楚的,在这情况下 master 会对这些 agent 之前所绑定的事件进行销毁防止内存泄露,并且通知各个 worker,最后重启 agent 进程保证业务的顺利进行
  • master 退出了或者失联了,worker 怎么办?不用担心,cluster 已经做好了这样的处理,当父进程退出后子进程自动退出
  • master 退出了或者失联了,agent 也像 worker 一样退出吗?然而并不是!这是child_process.fork 和 cluster.fork 的不同之处,master 退出了或者失联了,agent 进程还继续运行,但是它的父进程已经不在了,它将会被 init 进程收养,从而成为孤儿进程,当这样的孤儿进程越来越多的时候服务器就会越来越卡。所以 master 退出后需要指定 agent 也一起退出!关于优雅推出,官方写了个 gracefull-process 模块,详细请看 eggjs-feed-03

开发模式

开发模式下 通过 egg-development + egg-watcher 模块监听相关文件的改动,然后 egg-cluster 模块负责进行重启操作

Talk is cheap. Show me the code

准备工作

写这篇文章的时候 egg-cluster 社区版最新版是 1.8.0 ,下面的内容以该版本为准

读源码前需要理解两个模块的作用:

  • messenger,负责 master,agent,worker IPC 通信的消息转发
  • cfork,负责 worker 的启动,状态监听以及 refork 操作

egg 是通过 index.js 作为入口文件进行启动的,输入以下代码然后就可以成功启动了

const egg = require('egg');
egg.startCluster(options, () => {console.log('started');
});

PS: 新版本的 Egg 现在不推荐 index.js 启动了,而是用 egg-bin dev 和 egg-scripts start

入口文件代码如此简单,那 egg 底层做了些什么?比如 egg.startCluster 这个方法里面做了些什么?

exports.startCluster = require('egg-cluster').startCluster;

原来 egg.startCluster 是 egg-cluster 模块暴露的一个 API

// egg-cluster/index.js
const Master = require('./lib/master');
exports.startCluster = function(options, callback) {new Master(options).ready(callback);
};

startCluster 主要做了这些事情:

  • 启动 master 进程
  • egg 启动成功后执行 callback 方法,比如希望在 egg 启动成功后执行一些初始化操作,但是不应该做业务上的初始化操作,master 进程不应该有业务逻辑,代码越精简越好;业务上的初始化操作应该在 app.js / agent.js 里面的 beforeStart 进行

Master(egg-cluster/lib/master.js)

// Master 继承了 events 模块,拥有 events 监听、发送消息的能力
class Master extends EventEmitter {}

Master#constructor

constructor 里面大致可以分为5个部分:

constructor(options) {super();this.options = parseOptions(options);// new 一个 Messenger 实例this.messenger = new Messenger(this);// 借用 ready 模块的方法ready.mixin(this);this.isProduction = isProduction();this.isDebug = isDebug();......// 根据不同运行环境(local、test、prod)设置日志输出级别this.logger = new ConsoleLogger({ level: process.env.EGG_MASTER_LOGGER_LEVEL || 'INFO' });...
}
// master 启动成功后通知 parent、app worker、agent
this.ready(() => {this.isStarted = true;const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : '';this.logger.info('[master] %s started on %s://127.0.0.1:%s (%sms)%s',frameworkPkg.name, this.options.https ? 'https' : 'http',this.options.port, Date.now() - startTime, stickyMsg);const action = 'egg-ready';this.messenger.send({ action, to: 'parent' });this.messenger.send({ action, to: 'app', data: this.options });this.messenger.send({ action, to: 'agent', data: this.options });
});
// 监听 agent 退出
this.on('agent-exit', this.onAgentExit.bind(this));
// 监听 agent 启动
this.on('agent-start', this.onAgentStart.bind(this));
// 监听 app worker 退出
this.on('app-exit', this.onAppExit.bind(this));
// 监听 app worker 启动
this.on('app-start', this.onAppStart.bind(this));
// 开发环境下监听 app worker 重启
this.on('reload-worker', this.onReload.bind(this));// 监听 agent 启动,注意这里只执行一次
this.once('agent-start', this.forkAppWorkers.bind(this));
// master监听自身的退出及退出后的处理// kill(2) Ctrl-C     监听 SIGINT 信号
process.once('SIGINT', this.onSignal.bind(this, 'SIGINT'));
// kill(3) Ctrl-\     监听 SIGQUIT 信号
process.once('SIGQUIT', this.onSignal.bind(this, 'SIGQUIT'));
// kill(15) default   监听 SIGTERM 信号
process.once('SIGTERM', this.onSignal.bind(this, 'SIGTERM'));// 监听 exit 事件
process.once('exit', this.onExit.bind(this));
// 监听端口冲突
detectPort((err, port) => {/* istanbul ignore if */if (err) {err.name = 'ClusterPortConflictError';err.message = '[master] try get free port error, ' + err.message;this.logger.error(err);process.exit(1);return;}this.options.clusterPort = port;this.forkAgentWorker(); // 如果端口没有冲突则执行该方法
});

Master#forkAgentWorker

master 进程以 child_process 模式启动 agent 进程

forkAgentWorker() {this.agentStartTime = Date.now();const args = [ JSON.stringify(this.options) ];const opt = { execArgv: process.execArgv.concat([ '--debug-port=5856' ]) };// 以 child_process.fork 模式启动 agent worker,此时 agent 成为 master 的子进程const agentWorker = this.agentWorker = childprocess.fork(agentWorkerFile, args, opt);// 记录 agent 的 idagentWorker.id = ++this.agentWorkerIndex;this.log('[master] agent_worker#%s:%s start with clusterPort:%s',agentWorker.id, agentWorker.pid, this.options.clusterPort);// master 监听从 agent 发送给 master 的消息, 并打上消息来源 (msg.from = 'agent')// 将消息通过 messenger 发送出去agentWorker.on('message', msg => {if (typeof msg === 'string') msg = { action: msg, data: msg };msg.from = 'agent';this.messenger.send(msg);});// master 监听 agent 的异常,并打上对应的 log 信息方便问题排查agentWorker.on('error', err => {err.name = 'AgentWorkerError';err.id = agentWorker.id;err.pid = agentWorker.pid;this.logger.error(err);});// master 监听 agent 的退出// 并通过 messenger 发送 agent 的 'agent-exit' 事件给 master// 告诉 master 说 agent 退出了agentWorker.once('exit', (code, signal) => {this.messenger.send({action: 'agent-exit',data: { code, signal },to: 'master',from: 'agent',});});
}

到这里,agent worker 已完成启动,并且 master 对其进行监听,这里有个疑问

agent 启动成功后是如何通知 master 进行下一步操作的?

const agentWorker = this.agentWorker = childprocess.fork(agentWorkerFile, args, opt);

以 child_process.fork 模式启动 agent worker,读取的是 agent_worker.js,截取里面的一段代码

// egg-cluster/lib/agent_worker.jsagent.ready(() => {agent.removeListener('error', startErrorHandler);process.send({ action: 'agent-start', to: 'master' });
});

agent 启动成功后调用 process.send() 通知 master,master 监听到该消息通过 messenger 转发出去

// Master#forkAgentWorker
agentWorker.on('message', msg => {if (typeof msg === 'string') msg = { action: msg, data: msg };msg.from = 'agent';this.messenger.send(msg);
});

最终由 master 进行 agent-start 事件的响应

// Master#constructor
...
...
this.on('agent-start', this.onAgentStart.bind(this));
...
this.once('agent-start', this.forkAppWorkers.bind(this));
...
...

Master#onAgentStart

agent 启动后的操作

onAgentStart() {// agent 启动成功后向 app worker 发送 'egg-pids' 事件并带上 agent pidthis.messenger.send({ action: 'egg-pids', to: 'app', data: [ this.agentWorker.pid ] });// 向 app worker 发送 'agent-start' 事件this.messenger.send({ action: 'agent-start', to: 'app' });this.logger.info('[master] agent_worker#%s:%s started (%sms)',this.agentWorker.id, this.agentWorker.pid, Date.now() - this.agentStartTime);
}

值得注意的是此时 app worker 还没启动,所以该消息会被丢弃,后续如果发生 agent 重启的情况会被 app worker 监听到

Master#forkAppWorkers

master 进程以 cluster 模式启动 app worker 进程

forkAppWorkers() {this.appStartTime = Date.now();this.isAllAppWorkerStarted = false;this.startSuccessCount = 0;this.workers = new Map();const args = [ JSON.stringify(this.options) ];this.log('[master] start appWorker with args %j', args);// 以 cluster 模式启动 app worker 进程cfork({exec: appWorkerFile,args,silent: false,count: this.options.workers,// 在开发环境下不会进行 refork,方便排查问题refork: this.isProduction,});// master 监听各个 app worker 进程的消息cluster.on('fork', worker => {this.workers.set(worker.process.pid, worker);worker.on('message', msg => {if (typeof msg === 'string') msg = { action: msg, data: msg };msg.from = 'app';this.messenger.send(msg);});this.log('[master] app_worker#%s:%s start, state: %s, current workers: %j',worker.id, worker.process.pid, worker.state, Object.keys(cluster.workers));});// master 监听各个 app worker 进程的 disconnect 事件并记录到 logcluster.on('disconnect', worker => {this.logger.info('[master] app_worker#%s:%s disconnect, suicide: %s, state: %s, current workers: %j',worker.id, worker.process.pid, worker.exitedAfterDisconnect, worker.state, Object.keys(cluster.workers));});// master 监听各个 app worker 进程的 exit 事件,并向 master 发送 'app-exit' 事件,将 app worker 退出后的事情交给 master 处理cluster.on('exit', (worker, code, signal) => {this.messenger.send({action: 'app-exit',data: { workerPid: worker.process.pid, code, signal },to: 'master',from: 'app',});});// master 监听各个 app worker 进程的 listening 事件,表示各个 app worker 已经可以开始工作了cluster.on('listening', (worker, address) => {this.messenger.send({action: 'app-start',data: { workerPid: worker.process.pid, address },to: 'master',from: 'app',});});
}

app worker 启动后,跟 agent 一样,通过 messenger 发 app-start 事件发送给 master,由 master 继续处理

// Master#constructor...
...
this.on('app-start', this.onAppStart.bind(this));
...
...

Master#onAppStart

app worker 启动后的操作

onAppStart(data) {const worker = this.workers.get(data.workerPid);...// app worker 启动成功后通知 agentthis.messenger.send({action: 'egg-pids',to: 'agent',data: getListeningWorker(this.workers),});...// app worker 准备好了if (this.options.sticky) {this.startMasterSocketServer(err => {if (err) return this.ready(err);this.ready(true);});} else {this.ready(true);}
}

这时 agent 和各个 app worker 已经 ready 了,master 也可以做好准备了,执行 ready 后的操作,把 egg-ready 事件发送给 parent、app、agent,告诉它们已经 ready 了,可以开始干活

this.ready(() => {...const action = 'egg-ready';this.messenger.send({ action, to: 'parent' });this.messenger.send({ action, to: 'app', data: this.options });this.messenger.send({ action, to: 'agent', data: this.options });
});

Master#onAgentExit

agent 退出后的处理

onAgentExit(data) {...// 告诉各个 app worker,agent 退出了this.messenger.send({ action: 'egg-pids', to: 'app', data: [] });...// 记录异常信息,方便问题排查const err = new Error(util.format('[master] agent_worker#%s:%s died (code: %s, signal: %s)',agentWorker.id, agentWorker.pid, data.code, data.signal));err.name = 'AgentWorkerDiedError';this.logger.error(err);// 移除事件监听,防止内存泄露agentWorker.removeAllListeners();...// 把 'agent-worker-died' 通知 parent 进程后重启 agent 进程this.log('[master] try to start a new agent_worker after 1s ...');setTimeout(() => {this.logger.info('[master] new agent_worker starting...');this.forkAgentWorker();}, 1000);this.messenger.send({action: 'agent-worker-died',to: 'parent',});
}

Master#onAppExit

app worker 退出后的处理

onAppExit(data) {...// 记录异常信息,方便问题排查if (!worker.isDevReload) {const signal = data.code;const message = util.format('[master] app_worker#%s:%s died (code: %s, signal: %s, suicide: %s, state: %s), current workers: %j',worker.id, worker.process.pid, worker.process.exitCode, signal,worker.exitedAfterDisconnect, worker.state,Object.keys(cluster.workers));const err = new Error(message);err.name = 'AppWorkerDiedError';this.logger.error(err);}// 移除事件监听,防止内存泄露worker.removeAllListeners();this.workers.delete(data.workerPid);// 发送 'egg-pids' 事件给 agent,告诉它目前处于 alive 状态的 app worker pidthis.messenger.send({ action: 'egg-pids', to: 'agent', data: getListeningWorker(this.workers) });// 发送 'app-worker-died' 的消息给 parent 进程this.messenger.send({action: 'app-worker-died',to: 'parent',});
}

Master#onReload

开发模式下监听文件的改动,对 app worker 进行重启操作

  • 开发模式下开启 egg-development 插件,对相关文件进行监听,监听到有文件改动的话向 master 发送 reload-worker 事件
process.send({to: 'master',action: 'reload-worker',
});
  • master 通过监听 reload-worker 事件后执行 onReload 方法
this.on('reload-worker', this.onReload.bind(this));
  • onReload 通过 cluster-reload 模块进行重启操作
onReload() {this.log('[master] reload workers...');for (const id in cluster.workers) {const worker = cluster.workers[id];worker.isDevReload = true;}require('cluster-reload')(this.options.workers);
}

Master#onExit

master 退出后的处理,该方法主要是打相关的 log

Master#onSignal和Master#close

测试的时候,master 对收到的各个系统 signal 进行响应

// kill(2) Ctrl-C
process.once('SIGINT', this.onSignal.bind(this, 'SIGINT'));
// kill(3) Ctrl-\
process.once('SIGQUIT', this.onSignal.bind(this, 'SIGQUIT'));
// kill(15) default
process.once('SIGTERM', this.onSignal.bind(this, 'SIGTERM'));
  • 杀死各个 app worker 进程
  • 杀死 agent 进程
  • 退出 master 进程
close() {this.closed = true;this.killAppWorkers();this.killAgentWorker();this.log('[master] close done, exiting with code:0');process.exit(0);
}

转自https://zhuanlan.zhihu.com/p/29374045?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io

Egg 源码解析之 egg-cluster相关推荐

  1. Flink 全网最全资源(视频、博客、PPT、入门、原理、实战、性能调优、源码解析、问答等持续更新)

    Flink 学习 https://github.com/zhisheng17/flink-learning 麻烦路过的各位亲给这个项目点个 star,太不易了,写了这么多,算是对我坚持下来的一种鼓励吧 ...

  2. Flink 全网最全资源(视频、博客、PPT、入门、实战、源码解析、问答等持续更新)...

    Flink 学习 github.com/zhisheng17/- 麻烦路过的各位亲给这个项目点个 star,太不易了,写了这么多,算是对我坚持下来的一种鼓励吧! 本项目结构 博客 1.Flink 从0 ...

  3. Flink 源码解析 —— 源码编译运行

    更新一篇知识星球里面的源码分析文章,去年写的,周末自己录了个视频,大家看下效果好吗?如果好的话,后面补录发在知识星球里面的其他源码解析文章. 前言 之前自己本地 clone 了 Flink 的源码,编 ...

  4. Egg 源码分析之 egg-core

    我们团队现在开发的node项目都是基于koa框架实现的,虽然现在也形成了一套团队内的标准,但是在开发的过程中也遇到了一些问题: 由于没有统一的规范,新人上手和沟通成本比较高,容易出现错误 仅局限于目前 ...

  5. dubbo源码解析(三十五)集群——cluster

    集群--cluster 目标:介绍dubbo中集群容错的几种模式,介绍dubbo-cluster下support包的源码. 前言 集群容错还是很好理解的,就是当你调用失败的时候所作出的措施.先来看看有 ...

  6. 设计模式 结构型模式 -- 装饰者模式(概述 快餐店案例 模式优点 使用场景 源码解析 BufferedWriter 和代理模式的区别)

    1. 装饰者模式 1.1 概述 我们先来看一个快餐店的例子: 快餐店有炒面.炒饭这些快餐,可以额外附加鸡蛋.火腿.培根这些配菜,当然加配菜需要额外加钱,每个配菜的价钱通常不太一样,那么计算总价就会显得 ...

  7. 谷歌BERT预训练源码解析(三):训练过程

    目录 前言 源码解析 主函数 自定义模型 遮蔽词预测 下一句预测 规范化数据集 前言 本部分介绍BERT训练过程,BERT模型训练过程是在自己的TPU上进行的,这部分我没做过研究所以不做深入探讨.BE ...

  8. Redis源码解析——前言

    今天开启Redis源码的阅读之旅.对于一些没有接触过开源代码分析的同学来说,可能这是一件很麻烦的事.但是我总觉得做一件事,不管有多大多难,我们首先要在战略上蔑视它,但是要在战术上重视它.除了一些高大上 ...

  9. 【特征匹配】ORB原理与源码解析

    相关 : Fast原理与源码解析 Brief描述子原理与源码解析 Harris原理与源码解析 http://blog.csdn.net/luoshixian099/article/details/48 ...

最新文章

  1. 「BATJ面试系列」并发编程
  2. android 颜色填充工具栏,Android工具栏颜色未由colorPrimary设置
  3. python3命令需要使用命令行开发者工具_关于Python3的import问题(pycharm可以运行命令行import错误)...
  4. 链表之删除单链表倒数第K个节点
  5. 20190626_二次开发BarTender打印机_C#代码_一边读取TID_一边打印_打印机POSTEK
  6. Re-attention机制Transformer,实现强大性能
  7. 不采用服务器虚拟化的优缺点,为什么要进行虚拟化部署?虚拟化的缺点是什么?...
  8. 大数乘法与大数加法 java实现
  9. 解决: Tomcat 启动项目没问题,访问网页页面出现空白无显示
  10. 疑问点sqlilibs 第一关
  11. 职场上被人针对要不要告诉领导
  12. 通过Python绘制九种二次曲面
  13. 大一计算机论文_大一计算机论文大纲模板范文 大一计算机论文提纲怎样写
  14. 云主机、云服务器、VPS的区别性能比较
  15. 读书笔记:《不抱怨的世界》
  16. C语言字符5,C语言字符数据(4、5).doc
  17. Hive的安装与配置
  18. 头像叠加android_Android开发头像挨着叠加效果
  19. 杜克大学电子与计算机工程系,杜克大学电子与计算机工程系
  20. 使用WORD宏实现查找带格式的文字并复制到特定地方

热门文章

  1. python爬虫进阶-大众点评店铺信息(字体反爬-静态映射)
  2. UI非常漂亮的数诚1对1直播/带收徒/带公会/运营版本
  3. CI3.* 创建公共方法
  4. 3.4nbsp;企业家——3.4.1nbsp;史蒂…
  5. 牧牛生态科技,区块链赋能港口供应链
  6. 在C 用GDI+实现图形图像的任意变形效果
  7. 获取12306的车次与单价的接口
  8. python中插件离线下载
  9. android开发新浪微博客户端
  10. 【高等数学基础进阶】常微分方程-part2