先用一张图来说明dubbo中的monitor模块结构:

基于Filter来实现服务调用监控功能

@Activate(group = {Constants.PROVIDER, Constants.CONSUMER})
public class MonitorFilter implements Filter {private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class);private final ConcurrentMap<String, AtomicInteger> concurrents = new ConcurrentHashMap<String, AtomicInteger>();private MonitorFactory monitorFactory;public void setMonitorFactory(MonitorFactory monitorFactory) {this.monitorFactory = monitorFactory;}// 调用过程拦截public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) { //1处RpcContext context = RpcContext.getContext(); // 提供方必须在invoke()之前获取context信息long start = System.currentTimeMillis(); // 记录起始时间戮getConcurrent(invoker, invocation).incrementAndGet(); // 并发计数try {Result result = invoker.invoke(invocation); // 让调用链往下执行collect(invoker, invocation, result, context, start, false); //4处return result;} catch (RpcException e) {collect(invoker, invocation, null, context, start, true);throw e;} finally {getConcurrent(invoker, invocation).decrementAndGet(); // 并发计数}} else {return invoker.invoke(invocation);}}
}

在上面代码的1处中,只有打开了监控开关时,即设置Constants.MONITOR_KEY参数时,才会进行有监控功能。

通过源码com.alibaba.dubbo.config.ServiceConfig<T>的方法

doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)来看看监控功能:

if (registryURLs != null && registryURLs.size() > 0&& url.getParameter("register", true)) {for (URL registryURL : registryURLs) {url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));URL monitorUrl = loadMonitor(registryURL); //2处if (monitorUrl != null) {url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); //3处}if (logger.isInfoEnabled()) {logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);}Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));Exporter<?> exporter = protocol.export(invoker);exporters.add(exporter);}
} else {Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);Exporter<?> exporter = protocol.export(invoker);exporters.add(exporter);
}

在上面源码的2处,会加载标签<dubbo:monitor protocol="dubbo" address="127.0.0.1:7070" />的内容,
如果应用设置<dubbo:monitor/>标签则,会设置URL的Constants.MONITOR_KEY属性,则在上面的MonitorFilter过滤器就会打开监控功能。

在MonitorFilter的4处,会调用到

com.alibaba.dubbo.monitor.dubbo.DubboMonitor.collect(..)方法,DubboMonitor实现了MonitorService的collect(..)方法,

方法:collect(URL statistics),可以看看传参URL对象知道统计的信息是哪些:

// 信息采集
private void collect(Invoker<?> invoker, Invocation invocation, Result result, RpcContext context, long start, boolean error) {try {// ---- 服务信息获取 ----long elapsed = System.currentTimeMillis() - start; // 计算调用耗时int concurrent = getConcurrent(invoker, invocation).get(); // 当前并发数String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY);String service = invoker.getInterface().getName(); // 获取服务名称String method = RpcUtils.getMethodName(invocation); // 获取方法名URL url = invoker.getUrl().getUrlParameter(Constants.MONITOR_KEY);Monitor monitor = monitorFactory.getMonitor(url);int localPort;String remoteKey;String remoteValue;if (Constants.CONSUMER_SIDE.equals(invoker.getUrl().getParameter(Constants.SIDE_KEY))) {// ---- 服务消费方监控 ----context = RpcContext.getContext(); // 消费方必须在invoke()之后获取context信息localPort = 0;remoteKey = MonitorService.PROVIDER;remoteValue = invoker.getUrl().getAddress();} else {// ---- 服务提供方监控 ----localPort = invoker.getUrl().getPort();remoteKey = MonitorService.CONSUMER;remoteValue = context.getRemoteHost();}String input = "", output = "";if (invocation.getAttachment(Constants.INPUT_KEY) != null) {input = invocation.getAttachment(Constants.INPUT_KEY);}if (result != null && result.getAttachment(Constants.OUTPUT_KEY) != null) {output = result.getAttachment(Constants.OUTPUT_KEY);}//A处monitor.collect(new URL(Constants.COUNT_PROTOCOL,NetUtils.getLocalHost(), localPort,service + "/" + method,MonitorService.APPLICATION, application,MonitorService.INTERFACE, service,MonitorService.METHOD, method,remoteKey, remoteValue,error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1",MonitorService.ELAPSED, String.valueOf(elapsed),MonitorService.CONCURRENT, String.valueOf(concurrent),Constants.INPUT_KEY, input,Constants.OUTPUT_KEY, output));} catch (Throwable t) {logger.error("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);}
}

看看上面源码的“A处”,创建的URL对象,
protocol:count
host:[服务模块的ip]
port:[服务模块的端口]
application:[服务模块的应用名]
interface:[接口名]
method:[方法名]
provider:[提供者的ip] //如果服务模块角色是消息者,则会标识对方的角色是提供者,否则;不会有这个统计项
consumer:[消息者的ip] //如果服务模块角色是提供者,则会标识对方的角色是消息者,否则;不会有这个统计项
failure:1 //服务调用失败的次数为1
success:1 //服务调用成功的次数为1
elapsed:[服务调用耗时,这个是实际调用时长,单位毫秒]
concurrent:[服务的并发数,即是同时调用了该方法,还没结果返回。]
input:[未知]
output:[未知]

这样可以将dubbo的服务调用统计数据暂时保存到DubboMonitor的ConcurrentMap<Statistics, AtomicReference<long[]>> statisticsMap中,

com.alibaba.dubbo.monitor.dubbo.DubboMonitor.collect(URL)源码:

public void collect(URL url) {// 读写统计变量int success = url.getParameter(MonitorService.SUCCESS, 0);int failure = url.getParameter(MonitorService.FAILURE, 0);int input = url.getParameter(MonitorService.INPUT, 0);int output = url.getParameter(MonitorService.OUTPUT, 0);int elapsed = url.getParameter(MonitorService.ELAPSED, 0);int concurrent = url.getParameter(MonitorService.CONCURRENT, 0);// 初始化原子引用Statistics statistics = new Statistics(url);AtomicReference<long[]> reference = statisticsMap.get(statistics);if (reference == null) {statisticsMap.putIfAbsent(statistics, new AtomicReference<long[]>());reference = statisticsMap.get(statistics);}// CompareAndSet并发加入统计数据long[] current;long[] update = new long[LENGTH];do {current = reference.get();if (current == null) {update[0] = success;update[1] = failure;update[2] = input;update[3] = output;update[4] = elapsed;update[5] = concurrent;update[6] = input;update[7] = output;update[8] = elapsed;update[9] = concurrent;} else {update[0] = current[0] + success;update[1] = current[1] + failure;update[2] = current[2] + input;update[3] = current[3] + output;update[4] = current[4] + elapsed;update[5] = (current[5] + concurrent) / 2;update[6] = current[6] > input ? current[6] : input;update[7] = current[7] > output ? current[7] : output;update[8] = current[8] > elapsed ? current[8] : elapsed;update[9] = current[9] > concurrent ? current[9] : concurrent;}} while (! reference.compareAndSet(current, update));
}

其中的long[]就保存了各项服务调用统计数据:

[0]:成功总数
[1]:失败总数
[4]:耗时总时长
[5]:并发数
[8]:最大耗时
[9]:最大并发数

DubboMonitor部分源码:

private final ConcurrentMap<Statistics, AtomicReference<long[]>> statisticsMap = new ConcurrentHashMap<Statistics, AtomicReference<long[]>>();public DubboMonitor(Invoker<MonitorService> monitorInvoker, MonitorService monitorService) {this.monitorInvoker = monitorInvoker;this.monitorService = monitorService;this.monitorInterval = monitorInvoker.getUrl().getPositiveParameter("interval", 60000);// 启动统计信息收集定时器sendFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {public void run() {// 收集统计信息try {send();} catch (Throwable t) { // 防御性容错logger.error("Unexpected error occur at send statistic, cause: " + t.getMessage(), t);}}}, monitorInterval, monitorInterval, TimeUnit.MILLISECONDS);}

可以看到会以1分钟的间隔,定时发送监控统计数据到dubbo的监控模块dubbo-monitor-simple

下面看看dubbo-monitor-simple的部分源码:

private final BlockingQueue<URL> queue;public SimpleMonitorService() {queue = new LinkedBlockingQueue<URL>(Integer.parseInt(ConfigUtils.getProperty("dubbo.monitor.queue", "100000")));writeThread = new Thread(new Runnable() {public void run() {while (running) {try {write(); // 记录统计日志 //6处} catch (Throwable t) { // 防御性容错logger.error("Unexpected error occur at write stat log, cause: " + t.getMessage(), t);try {Thread.sleep(5000); // 失败延迟} catch (Throwable t2) {}}}}});writeThread.setDaemon(true);writeThread.setName("DubboMonitorAsyncWriteLogThread");writeThread.start();chartFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {public void run() {try {draw(); // 绘制图表 //7处} catch (Throwable t) { // 防御性容错logger.error("Unexpected error occur at draw stat chart, cause: " + t.getMessage(), t);}}}, 1, 300, TimeUnit.SECONDS);INSTANCE = this;
}public void collect(URL statistics) { //5处queue.offer(statistics);if (logger.isInfoEnabled()) {logger.info("collect statistics: " + statistics);}
}

从5处来看,这方法收到统计数据会先入队列,
然后在6处,会有后台线程不断地将列队的数据写入到本地文件中,

从7处来看,间隔一段时间会从本地文件中取出数据画出图。

在源码“6处”的write方法:

private void write() throws Exception {URL statistics = queue.take();if (POISON_PROTOCOL.equals(statistics.getProtocol())) {return;}String timestamp = statistics.getParameter(Constants.TIMESTAMP_KEY);Date now;if (timestamp == null || timestamp.length() == 0) {now = new Date();} else if (timestamp.length() == "yyyyMMddHHmmss".length()) {now = new SimpleDateFormat("yyyyMMddHHmmss").parse(timestamp);} else {now = new Date(Long.parseLong(timestamp));}String day = new SimpleDateFormat("yyyyMMdd").format(now);SimpleDateFormat format = new SimpleDateFormat("HHmm");for (String key : types) {try {String type;String consumer;String provider;if (statistics.hasParameter(PROVIDER)) {type = CONSUMER;consumer = statistics.getHost();provider = statistics.getParameter(PROVIDER);int i = provider.indexOf(':');if (i > 0) {provider = provider.substring(0, i);}} else {type = PROVIDER;consumer = statistics.getParameter(CONSUMER);int i = consumer.indexOf(':');if (i > 0) {consumer = consumer.substring(0, i);}provider = statistics.getHost();}String filename = statisticsDirectory + "/" + day + "/" + statistics.getServiceInterface() + "/" + statistics.getParameter(METHOD) + "/" + consumer + "/" + provider + "/" + type + "." + key;File file = new File(filename);File dir = file.getParentFile();if (dir != null && ! dir.exists()) {dir.mkdirs();}FileWriter writer = new FileWriter(file, true);try {writer.write(format.format(now) + " " + statistics.getParameter(key, 0) + "\n");writer.flush();} finally {writer.close();}} catch (Throwable t) {logger.error(t.getMessage(), t);}}
}

String[] types = {SUCCESS, FAILURE, ELAPSED, CONCURRENT, MAX_ELAPSED, MAX_CONCURRENT};
数据保存到文件:statistics/[yyyyMMdd]/[接口名]/[方法名]/[消费者ip]/[提供者ip]/[type的值"provider"或者"consumer"].[tyeps数组里值]
文件内容保存每一分钟的值,如consumer.success文件的内容:
1313 5
1314 0
1315 0
保存了消费者分别在13时13、14、15分钟调用成功的次数。

自己写了个RPC:

https://github.com/nytta

可以给个star,^0^.

(九)监控模块-monitor相关推荐

  1. 2021年的“金九银十”你准备好了吗?,腾讯、字节、百度、阿里、快手等一线互联网公司面试真题分享

    前言 职场的"金三银四"跳槽季过去了,但是紧接着,眼下"金九银十"又来了. 不同的是今年比往年「冷」一些,形式更加严峻一些,大家多多少少可能都听到或看到一些信息 ...

  2. Cortex-A7 MPCore 架构详细介绍(九种运行模式、内核寄存器组R0~R15,有特定的名字和功能)

    目录 0.ARM架构的历史简介 1.Cortex-A7 MPCore(即多核) 简介 2.Cortex-A 处理器九种运行模式 3.Cortex-A 寄存器组(内核寄存器) 3.1通用寄存器 3.1. ...

  3. 第二十九章 管理许可(二)

    文章目录 第二十九章 管理许可(二) 激活许可证密钥 更新许可证密钥 许可证故障排除 Administrator Terminal Session Administrator Session on W ...

  4. 多线程教程(九)偏向锁

    多线程教程(九)偏向锁 轻量级锁在没有竞争时(就自己这个线程),每次重入仍然需要执行 CAS 操作. Java 6 中引入了偏向锁来做进一步优化:只有第一次使用 CAS 将线程 ID 设置到对象的 M ...

  5. JUC笔记-共享模型之管程 (Monitor)

    JUC-共享模型之管程( Monitor) 一.线程安全问题(重点) 1.1 同步 1.2 线程出现问题的根本原因分析 1.3 synchronized 解决方案 1.3.1 同步代码块 1.3.2 ...

  6. cetus权限连接主从mysql_网易开源中间件 -Cetus监控模块

    01 概述 本文主要对Cetus的监控模块的使用及原理进行介绍,并介绍Cetus使用过程中,监控模块常见的问题及解决方法. Cetus监控模块拥有独立的监控线程,主要是对Cetus后端各个MySQL实 ...

  7. MySQL数据库(九) 集群 Cluster 和性能优化

    文章目录 6 MySQL 集群 Cluster 6.1 MySQL主从复制 6.1.1 主从复制架构和原理 6.1.2 实现主从复制配置 6.1.3 主从复制相关 6.1.4 实现级联复制 6.1.5 ...

  8. 软考学院九老师简介:国家软考(中高级认证)培训

    王老师 课课家软考学院九老师(原称呼:王老师)拥有11年IT职业培训生涯,5600多学时的授课经历,国内IT培训金牌讲师.具有丰富的教学和实践经验,对IT职业培训有深刻的理解.完成40多个大中型项目, ...

  9. NeHe OpenGL第二十九课:Blt函数

    NeHe OpenGL第二十九课:Blt函数 Blitter 函数: 类似于DirectDraw的blit函数,过时的技术,我们有实现了它.它非常的简单,就是把一块纹理贴到另一块纹理上. 这篇文章是有 ...

最新文章

  1. ImageMagick简单记录
  2. 这场景像不像你修Bug的样子?
  3. 艾伟也谈项目管理,谈谈如何说“不”
  4. Huawei S8512
  5. Error:fatal: Not a git repository (or any of the parent directories): .git
  6. 安卓APP_ 四大基本组件(1)—— Activity
  7. 对CORS OPTIONS预检请求的一些思考
  8. [SpringBoot2]定制化原理_SpringBoot定制化组件的几种方式
  9. Oracle/Mysql查看锁表与解锁表
  10. php exif信息,php如何给jpg图片写入exif信息?
  11. k8s高可用集群_搭建高可用集群(部署haproxy和安装docker以及其他组件)---K8S_Google工作笔记0056
  12. 微信小程序后端Java接口开发
  13. 计算机二级机试题型,计算机二级机试题库
  14. 程序员:我终于知道post和get的区别
  15. QQ互联一直显示“未提交审核”
  16. igraph基本使用方法示例
  17. ros软路由防火墙(双线路、日志配置)
  18. 索尼xz1c 日版融卡扩容经验分享(无需root,索尼XZP机型也适用)
  19. VR虚拟线上展馆之水资源绿色发展博览会
  20. 机器人工程→合适的规划←

热门文章

  1. matlab编程反演S参数求电磁参数,如何通过CST仿真出来的S参数获得SRR结构的等效电磁参数...
  2. 手机相机变成QQ摄像头 先锋P80W
  3. 富文本编辑器——UEditor的使用——基础积累
  4. 【财务危机】--2018.6债务
  5. boost 1.57在VC2012里编译出错以及解决
  6. ts多个type合并, 属性不唯一合并冲突问题
  7. Java中的emoji表情
  8. 【推荐算法】图解抖音推荐算法
  9. java康纳塔碳纤维硬吗,碳纤维车身是最硬的吗 最奢侈的汽车材料就是它
  10. Maven技术快速入门,高级java开发工程师的工作总结