Netty框架之HashedWheelTimer
Netty简介
Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。Redis也是基于事件驱动框架开发的。
Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。
Netty官网:Netty: Home
源码在github镜像下载链接:gitclone.com
HashedWheelTimer
本篇文章的主角是Netty框架工具类下面的HashedWheelTimer,它是一个为I/O超时调度而优化的计时器。
官方文档:HashedWheelTimer (Netty API Reference (4.0.56.Final))
下面以一个简单的Demo为例,每秒打印一下当前时间,
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;public class HashedWheelTimerDemo {// 1个槽代表的时间,默认100msprivate static final int TICK_DURATION = 1;// 延时任务时间,即等待多久开始执行时间轮任务private static final int DELAY_TASK_TIME = 60;// hashedWheelTimer设置3600个槽,相当于一个圆的1/3600,每移动一个槽的时间是1秒。private static final int TICKS_PER_WHEEL = 3600;private static final HashedWheelTimer TIMER = new HashedWheelTimer(TICK_DURATION, TimeUnit.SECONDS, TICKS_PER_WHEEL);private static final int EACH_TASK_TIME = 1;public static void main(String[] args) {// 任务需要经过的tick数为: 60000 / 1000 = 60次,即经过1分钟后才是正常的每秒输出当前时间executeTask(TIMER.newTimeout(HashedWheelTimerDemo::executeTask, DELAY_TASK_TIME, TimeUnit.SECONDS));}private static void executeTask(Timeout timeout) {try {LocalDateTime now = LocalDateTime.ofInstant(Instant.now(), ZoneId.systemDefault());LocalDateTime expiredTime = now.minusSeconds(EACH_TASK_TIME);System.out.println(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(expiredTime));} finally {TIMER.newTimeout(timeout.task(), EACH_TASK_TIME, TimeUnit.SECONDS);}}
}
上面是一个简单的Demo,当然使用它还是很简单的,那么接下来了解一下它怎么实现的,可以找到HashedWheelTimer的源码,看看每次超时是在做什么?
HashedWheelTimer的有参构造函数传入了三个参数:
- tickDuration:一个 bucket 代表的时间,默认为 100ms
- unit:时间单位
- ticksPerWheel:一轮含有多少个 bucket ,默认为 512 个
HashedWheelTimer类中有多个有参构造函数,把默认值传递到了最下面的6个参数的有参构造函数中去。
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) {this.worker = new HashedWheelTimer.Worker();this.startTimeInitialized = new CountDownLatch(1);this.timeouts = PlatformDependent.newMpscQueue();this.cancelledTimeouts = PlatformDependent.newMpscQueue();this.pendingTimeouts = new AtomicLong(0L);if (threadFactory == null) {throw new NullPointerException("threadFactory");} else if (unit == null) {throw new NullPointerException("unit");} else if (tickDuration <= 0L) {throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);} else if (ticksPerWheel <= 0) {throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);} else {this.wheel = createWheel(ticksPerWheel);this.mask = this.wheel.length - 1;long duration = unit.toNanos(tickDuration);if (duration >= 9223372036854775807L / (long)this.wheel.length) {throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, 9223372036854775807L / (long)this.wheel.length));} else {if (duration < MILLISECOND_NANOS) {logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS);this.tickDuration = MILLISECOND_NANOS;} else {this.tickDuration = duration;}this.workerThread = threadFactory.newThread(this.worker);this.leak = !leakDetection && this.workerThread.isDaemon() ? null : leakDetector.track(this);this.maxPendingTimeouts = maxPendingTimeouts;if (INSTANCE_COUNTER.incrementAndGet() > 64 && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();}}}}
线程工厂根据worker创建的工作线程在下面的newTimeout方法中启动,
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {// 计时器任务为空抛异常if (task == null) {throw new NullPointerException("task");// 时延单位为空抛异常} else if (unit == null) {throw new NullPointerException("unit");} else {// 原子计数器 +1long pendingTimeoutsCount = this.pendingTimeouts.incrementAndGet();// 计数器大于最大计数超时值时 -1并抛异常if (this.maxPendingTimeouts > 0L && pendingTimeoutsCount > this.maxPendingTimeouts) {this.pendingTimeouts.decrementAndGet();throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending timeouts (" + this.maxPendingTimeouts + ")");} else {// 工作状态更新,并启动工作线程this.start();long deadline = System.nanoTime() + unit.toNanos(delay) - this.startTime;// 如果时延大于0且deadline小于0,则循环if (delay > 0L && deadline < 0L) {deadline = 9223372036854775807L;}// 时间轮超时队列排队HashedWheelTimer.HashedWheelTimeout timeout = new HashedWheelTimer.HashedWheelTimeout(this, task, deadline);this.timeouts.add(timeout);return timeout;}}}
大体的实现结构如下图,
下面看一下Worker线程中做了什么?
private final class Worker implements Runnable {... ...public void run() {// 初始化启动时间HashedWheelTimer.this.startTime = System.nanoTime();// startTime 0表示未被初始化,1表示已被初始化if (HashedWheelTimer.this.startTime == 0L) {HashedWheelTimer.this.startTime = 1L;}// CountDownLatch.countDown减1,直到计数器为0继续往下走HashedWheelTimer.this.startTimeInitialized.countDown();int idx;HashedWheelTimer.HashedWheelBucket bucket;do {long deadline = this.waitForNextTick();// 处理超时的任务if (deadline > 0L) {idx = (int)(this.tick & (long)HashedWheelTimer.this.mask);this.processCancelledTasks();bucket = HashedWheelTimer.this.wheel[idx];this.transferTimeoutsToBuckets();bucket.expireTimeouts(deadline);++this.tick;}} while(HashedWheelTimer.WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == 1);HashedWheelTimer.HashedWheelBucket[] var5 = HashedWheelTimer.this.wheel;int var2 = var5.length;for(idx = 0; idx < var2; ++idx) {bucket = var5[idx];bucket.clearTimeouts(this.unprocessedTimeouts);}while(true) {HashedWheelTimer.HashedWheelTimeout timeout = (HashedWheelTimer.HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll();if (timeout == null) {this.processCancelledTasks();return;}if (!timeout.isCancelled()) {this.unprocessedTimeouts.add(timeout);}}}... ...
}
Netty框架之HashedWheelTimer相关推荐
- 【Netty】Netty 简介 ( 原生 NIO 弊端 | Netty 框架 | Netty 版本 | 线程模型 | 线程 阻塞 IO 模型 | Reactor 模式引入 )
文章目录 一. NIO 原生 API 弊端 二. Netty 简介 三. Netty 架构 四. Netty 版本 五. Netty 线程模型 六. 阻塞 IO 线程模型 七. 反应器 ( React ...
- Netty框架整体架构及源码知识点
Netty概述 Netty是一个高性能.异步事件驱动的NIO框架,它提供了对TCP.UDP和文件传输的支持.作为当前最流行的NIO框架,Netty在互联网领域.大数据分布式计算领域.游戏行业.通信行业 ...
- Netty框架多人聊天案例,代码示例
Netty框架多人聊天案例,代码示例 pom <?xml version="1.0" encoding="UTF-8"?> <project ...
- Netty框架入门案例,代码示例
Netty框架入门案例 pom文件 <?xml version="1.0" encoding="UTF-8"?> <project xmlns ...
- netty框架实现websocket达到高并发
websocket(三) 进阶!netty框架实现websocket达到高并发 引言: 在前面两篇文章中,我们对原生websocket进行了了解,且用demo来简单的讲解了其用法.但是在实际项目中,那 ...
- Netty框架中的@Skip使用说明
最近在学习Netty框架,对着教程上写了个简单的netty应用,可是死活调试不成功,对着程序跟教程上看了几遍也找不到原因,后来又重新写了一遍,服务端程序终于调试成功,原因出在了那个@Skip注释上了, ...
- Netty框架之责任链模式及其应用
Netty框架之概述及基本组件介绍 Reactor网络编程模型解析 前言 在上篇博客介绍完netty框架的基本组件介绍和概述,也跟着代码看了下NioEventLoopGroup的启动过程,以及基于Re ...
- JavaSocket编程之Netty框架线程模型
1.Netty概述 Netty是一个由JBoss提供的高效的Java NIO client-server(客户端-服务器)开发框架,使用Netty可以快速开发网络应用.Netty提供了一种新的方式来使 ...
- Netty框架之TCP粘包/半包解决方案
Netty框架之TCP粘包/半包解决方案 一.TCP粘包 二.TCP半包 三.TCP粘包/半包解决方案 1.FixedLengthFrameDecoder定长解析器 2.LineBasedFrameD ...
最新文章
- SQL Server 2012 安全新特性:包含数据库
- CVPR2019接收结果公布了,但CVPR 2018的那些论文都怎么样了?
- Linux命令学习(三):文件操作命令(1)
- SiteMapCreator 发布 (Open Source)
- 基于依存句法与语义角色标注的事件抽取项目
- javascript用window open的子窗口关闭自己并且刷新父窗口
- pip matplotlib 使用镜像源,pytorch 1.5 cpu
- ModuleNotFoundError: No module named ‘torchversion‘
- 前端面试总结--数据结构与算法五
- 海外 Android 三方应用市场
- 如何倒出2两酒(5)
- 关于TypeError: e[h] is not a function的问题
- html5 m4a,Audio Unit播放aac/m4a/mp3等文件
- vue3中使用tsx
- 【跨域】Access-Control-Allow-Origin 简单介绍
- 兰卡斯特大学 计算机,兰卡斯特大学计算机科学与信息系统Computer Science and Information Systems世界排名2020年最新排名第151-200位(QS世界排名)...
- c# 谷歌内核cefsharp的简单使用以及实现 webBrowser.Navigate(..,post data)类似的功能
- 编译 JellyBean 的时候遇到的问题
- 基于以太坊区块链的物联网/IoT设备管理【论文及源码】
- 数学建模之主成分分析(PCA)
热门文章
- U盘文件或目录损坏且无法读取怎么修复
- Springboot毕设项目简生活视频网站e6m17(java+VUE+Mybatis+Maven+Mysql)
- github:Git 常用指令
- c 语言 聚餐人数统计
- VMware16安装macOS10.15.1 - 黑苹果 - osx虚拟机
- 推荐~某位前辈的求职经历及建议~超详细
- c++画一个简单的圣诞树
- Windows 7 ms17-010永恒之蓝漏洞复现
- Redis 客户端源码分析+实现
- c语言程序排列与组合,C语言实现排列/组合算法