简化版Netty源码

netty学习目录
一、Netty(一) NIO例子
二、Netty(二) netty服务端
三、Netty(三) Netty客户端+服务端
四、Netty(四) 简化版Netty源码
五、Netty(五)Netty5.x服务端
六、Netty(六) Netty Http 服务器例子
七、Netty(七) Netty服务端+客户端代码
八、Netty(八) Netty多客户端连接例子
九、Netty(九) Netty会话清除
十、Netty(十) Netty自定义编码器解码器
十一、Netty(十一) Netty对象传输


package com.zqw.nio.netty.n3.pool;import java.nio.channels.ServerSocketChannel;public interface Boss {/*** 加入一个新的ServerSocket* @param serverChannel*/public void registerAcceptChannelTask(ServerSocketChannel serverChannel);
}
package com.zqw.nio.netty.n3.pool;import com.zqw.nio.netty.n3.NioServerBoss;
import com.zqw.nio.netty.n3.NioServerWorker;import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;public class NioSelectorRunnablePool {/*** boss线程数组*/private final AtomicInteger bossIndex = new AtomicInteger();private Boss[] bosses;/*** worker线程数组*/private final AtomicInteger workerIndex = new AtomicInteger();private Worker[] workeres;public NioSelectorRunnablePool(Executor boss, Executor worker) {initBoss(boss, 1);initWorker(worker, Runtime.getRuntime().availableProcessors() * 2);}/*** 初始化boss线程* @param boss* @param count*/private void initBoss(Executor boss, int count) {this.bosses = new NioServerBoss[count];for (int i = 0; i < bosses.length; i++) {bosses[i] = new NioServerBoss(boss, "boss thread " + (i+1), this);}}/*** 初始化worker线程* @param worker* @param count*/private void initWorker(Executor worker, int count) {this.workeres = new NioServerWorker[count];for (int i = 0; i < workeres.length; i++) {workeres[i] = new NioServerWorker(worker, "worker thread " + (i+1), this);}}/*** 获取一个worker* @return*/public Worker nextWorker() {return workeres[Math.abs(workerIndex.getAndIncrement() % workeres.length)];}/*** 获取一个boss* @return*/public Boss nextBoss() {return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];}}
package com.zqw.nio.netty.n3.pool;import java.nio.channels.SocketChannel;public interface Worker {/*** 加入一个新的客户端会话* @param channel*/public void registerNewChannelTask(SocketChannel channel);}
package com.zqw.nio.netty.n3;import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;import java.io.IOException;
import java.nio.channels.Selector;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;public abstract class AbstractNioSelector implements Runnable {/*** 线程池*/private final Executor executor;/*** 选择器*/protected Selector selector;/*** 选择器wakenUp状态标记*/protected final AtomicBoolean wakenUp = new AtomicBoolean();/*** 任务队列*/private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();/*** 线程名称*/private String threadName;/*** 线程管理对象*/protected NioSelectorRunnablePool selectorRunnablePool;AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {this.executor = executor;this.threadName = threadName;this.selectorRunnablePool = selectorRunnablePool;openSelector();}/*** 获取selector并启动线程*/private void openSelector() {try {this.selector = Selector.open();} catch (IOException e) {throw new RuntimeException("Failed to create a selector.");}executor.execute(this);}@Overridepublic void run() {Thread.currentThread().setName(this.threadName);while (true) {try {wakenUp.set(false);select(selector);processTaskQueue();process(selector);} catch (Exception e) {// ignore}}}/*** 注册一个任务并激活selector* * @param task*/protected final void registerTask(Runnable task) {taskQueue.add(task);Selector selector = this.selector;if (selector != null) {if (wakenUp.compareAndSet(false, true)) {selector.wakeup();}} else {taskQueue.remove(task);}}/*** 执行队列里的任务*/private void processTaskQueue() {for (;;) {final Runnable task = taskQueue.poll();if (task == null) {break;}task.run();}}/*** 获取线程管理对象* @return*/public NioSelectorRunnablePool getSelectorRunnablePool() {return selectorRunnablePool;}/*** select抽象方法* * @param selector* @return* @throws IOException*/protected abstract int select(Selector selector) throws IOException;/*** selector的业务处理* * @param selector* @throws IOException*/protected abstract void process(Selector selector) throws IOException;}
package com.zqw.nio.netty.n3;import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;import com.zqw.nio.netty.n3.pool.Boss;
import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;
import com.zqw.nio.netty.n3.pool.Worker;public class NioServerBoss extends AbstractNioSelector implements Boss {public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {super(executor, threadName, selectorRunnablePool);}@Overrideprotected void process(Selector selector) throws IOException {Set<SelectionKey> selectedKeys = selector.selectedKeys();if (selectedKeys.isEmpty()) {return;}for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {SelectionKey key = i.next();i.remove();ServerSocketChannel server = (ServerSocketChannel) key.channel();// 新客户端SocketChannel channel = server.accept();// 设置为非阻塞channel.configureBlocking(false);// 获取一个workerWorker nextworker = getSelectorRunnablePool().nextWorker();// 注册新客户端接入任务nextworker.registerNewChannelTask(channel);System.out.println("新客户端链接");}}public void registerAcceptChannelTask(final ServerSocketChannel serverChannel){final Selector selector = this.selector;registerTask(new Runnable() {@Overridepublic void run() {try {//注册serverChannel到selectorserverChannel.register(selector, SelectionKey.OP_ACCEPT);} catch (ClosedChannelException e) {e.printStackTrace();}}});}@Overrideprotected int select(Selector selector) throws IOException {return selector.select();}}
package com.zqw.nio.netty.n3;import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;
import com.zqw.nio.netty.n3.pool.Worker;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;public class NioServerWorker extends AbstractNioSelector implements Worker {public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {super(executor, threadName, selectorRunnablePool);}@Overrideprotected void process(Selector selector) throws IOException {Set<SelectionKey> selectedKeys = selector.selectedKeys();if (selectedKeys.isEmpty()) {return;}Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();while (ite.hasNext()) {SelectionKey key = (SelectionKey) ite.next();// 移除,防止重复处理ite.remove();// 得到事件发生的Socket通道SocketChannel channel = (SocketChannel) key.channel();// 数据总长度int ret = 0;boolean failure = true;ByteBuffer buffer = ByteBuffer.allocate(1024);//读取数据try {ret = channel.read(buffer);failure = false;} catch (Exception e) {// ignore}//判断是否连接已断开if (ret <= 0 || failure) {key.cancel();System.out.println("客户端断开连接");}else{System.out.println("收到数据:" + new String(buffer.array()));//回写数据ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes());channel.write(outBuffer);// 将消息回送给客户端}}}/*** 加入一个新的socket客户端*/public void registerNewChannelTask(final SocketChannel channel){final Selector selector = this.selector;registerTask(new Runnable() {@Overridepublic void run() {try {//将客户端注册到selector中channel.register(selector, SelectionKey.OP_READ);} catch (ClosedChannelException e) {e.printStackTrace();}}});}@Overrideprotected int select(Selector selector) throws IOException {return selector.select(500);}}
package com.zqw.nio.netty.n3;import com.zqw.nio.netty.n3.pool.Boss;
import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;public class ServerBootstrap {private NioSelectorRunnablePool selectorRunnablePool;public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) {this.selectorRunnablePool = selectorRunnablePool;}/*** 绑定端口* @param localAddress*/public void bind(final SocketAddress localAddress){try {// 获得一个ServerSocket通道ServerSocketChannel serverChannel = ServerSocketChannel.open();// 设置通道为非阻塞serverChannel.configureBlocking(false);// 将该通道对应的ServerSocket绑定到port端口serverChannel.socket().bind(localAddress);//获取一个boss线程Boss nextBoss = selectorRunnablePool.nextBoss();//向boss注册一个ServerSocket通道nextBoss.registerAcceptChannelTask(serverChannel);} catch (Exception e) {e.printStackTrace();}}
}
package com.zqw.nio.netty.n3;import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;import java.net.InetSocketAddress;
import java.util.concurrent.Executors;public class Start {public static void main(String[] args) {//初始化线程NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());//获取服务类ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool);//绑定端口bootstrap.bind(new InetSocketAddress(10101));System.out.println("start");}}

Netty(四) 简化版Netty源码相关推荐

  1. 适合新手:从零开发一个IM服务端(基于Netty,有完整源码)

    本文由"yuanrw"分享,博客:juejin.im/user/5cefab8451882510eb758606,收录时内容有改动和修订. 0.引言 站长提示:本文适合IM新手阅读 ...

  2. java在线客服系统源码 springboot客服聊天源码 网页客服源码 netty通信技术,java源码

    ava在线客服系统源码 springboot客服聊天源码 网页客服源码 netty通信技术,java源码 Java在线客服系统源码 企业网站客服聊天源码 网页客服源码 开发环境:Java + Spri ...

  3. Android音频实时传输与播放(四):源码下载(问题更新)【转】

    Android音频实时传输与播放(四):源码下载(问题更新) 激动人心的时刻到了有木有 ^_^ 服务端下载请点击这里,客户端下载请点击这里! 最近有朋友在下载源码使用之后,说播放出来的声音噪声很大.其 ...

  4. 短网址生成+域名检测+短网址还原等四合一前端源码

    介绍: 短网址生成+域名检测+短网址还原等四合一前端源码没有功能哦~!有会后端技术的可以拿去开发用!页面还是挺漂亮的,里面还有很多子页面都非常的好看! 网盘下载地址: http://kekewangL ...

  5. 第十四课 k8s源码学习和二次开发原理篇-调度器原理

    第十四课 k8s源码学习和二次开发原理篇-调度器原理 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第十四课 k8s源码学习和二次开发原理篇-调度器原理 第一节 ...

  6. @Import注解:导入配置类的四种方式源码解析

    微信搜索:码农StayUp 主页地址:https://gozhuyinglong.github.io 源码分享:https://github.com/gozhuyinglong/blog-demos ...

  7. 第四课 k8s源码学习和二次开发-DeltaFIFO和Indexer原理学习

    第四课 k8s源码学习和二次开发-DeltaFIFO和Indexer原理学习 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第四课 k8s源码学习和二次开发-De ...

  8. Netty学习十七:源码分析之HashWheelTimer

    一.常见定时任务实现 定时器的使用场景包括:成月统计报表.财务对账.会员积分结算.邮件推送等,它一般有三种表现形式:按固定周期定时执行.延迟一定时间后执行.指定某个时刻执行. 定时器的本质是设计一种数 ...

  9. netty tcp服务端主动断开客户端_「Netty核心技术」6-ChannelPipeline源码

    ChannelPipeline是Channelhandler的容器,它负责ChannelHandler的管理和事件拦截与调度. 土话: ChannelPipeline就是用来管理Channelhand ...

最新文章

  1. CI框架如何删除地址栏的 index.php
  2. nas服务器搭建 linux,小白都能看懂的NAS服务器搭建教程
  3. 计算机组装维护的概念,实用计算机组装与维护库及概念.doc
  4. Python基础教程:函数名本质
  5. eclipse索引4超出范围_Python内置的4个重要基本数据结构:列表、元组、字典和集合
  6. linux的用户及权限管理,用户及权限管理
  7. 我的第一个python web开发框架(23)——代码版本控制管理与接口文档
  8. android p官方铃声,Android 铃声播放
  9. 课堂对Complex类运算符重载的小练习
  10. CSS expression VS Script event
  11. Centos7安装SCIP with AMPL
  12. 基于confd和etcd的tuxedo中间件容器化方案
  13. VSCode下载安装最新详细教程2022(win10)
  14. DAY1——sql 建表/插入数据
  15. html5获取经纬度页面,html5获取经纬度
  16. 数学中倒三角:梯度;正三角:拉普拉斯
  17. 大数据存储引擎-bigstore
  18. steam好友网络无法连接
  19. 计算机应用大赛动员大会,计算机应用工程系第十二届学生会动员大会
  20. 少室山论道——武学修炼之道

热门文章

  1. 超宽带(UWB)学习笔记——消除多径误差的第一径检测算法
  2. Python之——实现自动抢火车票(基于Python3.6+splinter)
  3. Google Glass众叛亲离?
  4. JavaScript在chrome浏览器实现录屏功能
  5. ffserver + hls
  6. 怎么找到一抛物线数组的顶点_一般抛物线的顶点怎么求?
  7. 300字的计算机英语作文,大学英语作文300字【五篇】
  8. 树冠体积计算之AlphaShape算法
  9. 沟通是管理的核心与灵魂
  10. STP-17-对抗单向链路问题