分布式理论、架构设计自定义RPC

  • 第一部分-RPC框架设计
    • 1. Socket回顾与I/0模型
      • 1.1 Socket网络编程回顾
        • 1.1.1 Socket概述
        • 1.1.2 Socket整体流程
        • 1.1.3 代码实现
      • 1.2 I/O模型
        • 1.2.1 I/O模型说明
        • 1.2.2 BIO(同步并阻塞)
        • 1.2.3 NIO(同步非阻塞)
        • 1.2.4 AIO(异步非阻塞)
        • 1.2.5 BIO、NIO、AIO 适用场景分析
    • 2.NIO编程
      • 2.1 NIO介绍
      • 2.2 NIO和 BIO的比较
      • 2.3 NIO 三大核心原理示意图
      • 2.4 缓冲区(Buffer)
        • 2.4.1 基本介绍
        • 2.4.2 Buffer常用API介绍
      • 2.5 通道(Channel)
        • 2.5.1 基本介绍
        • 2.5.2 Channel常用类介绍
        • 2.5.3 ServerSocketChannel
        • 2.5.4 SocketChannel
      • 2.6 Selector (选择器)
        • 2.6.1 基本介绍
        • 2.6.2 常用API介绍
        • 2.6.3 Selector 编码
    • 3.Netty核心原理
      • 3.1 Netty 介绍
        • 3.1.1 原生 NIO 存在的问题
        • 3.1.2 概述
      • 3.2 线程模型
        • 3.2.1 线程模型基本介绍
        • 3.2.2 传统阻塞 I/O 服务模型
        • 3.2.3 Reactor 模型
        • 3.2.4 Netty线程模型
      • 3.3 核心API介绍
        • 3.3.1 ChannelHandler及其实现类
        • 3.3.2 ChannelPipeline
        • 3.3.3 ChannelHandlerContext
        • 3.3.4 ChannelOption
        • 3.3.5 ChannelFuture
        • 3.3.6 EventLoopGroup和实现类NioEventLoopGroup
        • 3.3.7 ServerBootstrap和Bootstrap
        • 3.3.8 Unpooled类
        • 3.4.1 Netty服务端编写
        • 3.4.2 Netty客户端编写
      • 3.5 Netty异步模型
        • 3.5.1 基本介绍
        • 3.5.2 Future 和Future-Listener
    • 4.Netty高级应用
      • 4.1 Netty编解码器
        • 4.1.1 Java的编解码
        • 4.1.2 Netty编解码器
      • 4.2 Netty案例-群聊天室
        • 4.2.1 聊天室服务端编写
        • 4.2.2 聊天室客户端编写
      • 4.3 基于Netty的Http服务器开发
        • 4.3.1 介绍
        • 4.3.2 功能需求
        • 4.3.3 服务端代码实现
      • 4.4 基于Netty的WebSocket开发网页版聊天室
        • 4.4.1 WebSocket简介
        • 4.4.2 WebSocket和HTTP的区别
        • 4.4.3 导入基础环境
        • 4.4.4 服务端开发
      • 4.5 Netty中粘包和拆包的解决方案
        • 4.5.1 粘包和拆包简介
        • 4.5.2 粘包和拆包代码演示
    • 5.Netty核心源码剖析
    • 6. 自定义RPC框架
      • 6.1 分布式架构网络通信
        • 6.1.1 基本原理
        • 6.1.2 什么是RPC
        • 5.1.3 RMI
      • 5.2 基于Netty实现RPC框架
        • 5.2.1 需求介绍

第一部分-RPC框架设计

1. Socket回顾与I/0模型

1.1 Socket网络编程回顾

1.1.1 Socket概述

Socket,套接字就是两台主机之间逻辑连接的端点。TCP/IP协议是传输层协议,主要解决数据如何在网络中传输,而HTTP是应用层协议,主要解决如何包装数据。Socket是通信的基石,是支持TCP/IP协议的网络通信的基本操作单元。它是网络通信过程中端点的抽象表示,包含进行网络通信必须的五种信息:连接使用的协议、本地主机的IP地址、本地进程的协议端口、远程主机的IP地址、远程进程的协议端口。

1.1.2 Socket整体流程

Socket编程主要涉及到客户端和服务端两个方面,首先是在服务器端创建一个服务器套接字(ServerSocket),并把它附加到一个端口上,服务器从这个端口监听连接。端口号的范围是0到65536,但是0到1024是为特权服务保留的端口号,可以选择任意一个当前没有被其他进程使用的端口。
客户端请求与服务器进行连接的时候,根据服务器的域名或者IP地址,加上端口号,打开一个套接字。当服务器接受连接后,服务器和客户端之间的通信就像输入输出流一样进行操作。

1.1.3 代码实现

  1. 服务端代码
package com.lagou.server;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ServerDemo {
public static void main(String[] args) throws Exception {
//1.创建一个线程池,如果有客户端连接就创建一个线程, 与之通信
ExecutorService executorService = Executors.newCachedThreadPool();
//2.创建 ServerSocket 对象
ServerSocket serverSocket = new ServerSocket(9999);
System.out.println("服务器已启动");
while (true) {
//3.监听客户端
Socket socket = serverSocket.accept();
System.out.println("有客户端连接");
//4.开启新的线程处理
executorService.execute(new Runnable() {
@Override
public void run() {
handle(socket);
}
});
}
}
public static void handle(Socket socket) {
try {
System.out.println("线程ID:" + Thread.currentThread().getId()
+ " 线程名称:" + Thread.currentThread().getName());
//从连接中取出输入流来接收消息
InputStream is = socket.getInputStream();
byte[] b = new byte[1024];
int read = is.read(b);
System.out.println("客户端:" + new String(b, 0, read));
//连接中取出输出流并回话
OutputStream os = socket.getOutputStream();
os.write("没钱".getBytes());
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//关闭连接
socket.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
}
}
  1. 客户端代码
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Scanner;
public class ClientDemo {
public static void main(String[] args) throws Exception {
while (true) {
//1.创建 Socket 对象
Socket s = new Socket("127.0.0.1", 9999);
//2.从连接中取出输出流并发消息
OutputStream os = s.getOutputStream();
System.out.println("请输入:");
Scanner sc = new Scanner(System.in);
String msg = sc.nextLine();
os.write(msg.getBytes());
//3.从连接中取出输入流并接收回话
InputStream is = s.getInputStream();
byte[] b = new byte[1024];
int read = is.read(b);
System.out.println("老板说:" + new String(b, 0, read).trim());
//4.关闭
s.close();
}
}
}

1.2 I/O模型

1.2.1 I/O模型说明

  1. I/O 模型简单的理解:就是用什么样的通道进行数据的发送和接收,很大程度上决定了程序通信的性能
  2. Java 共支持 3 种网络编程模型/IO 模式:BIO(同步并阻塞)、NIO(同步非阻塞)、AIO(异步非阻塞)
    阻塞与非阻塞
    主要指的是访问IO的线程是否会阻塞(或处于等待)
    线程访问资源,该资源是否准备就绪的一种处理方式


同步和异步
主要是指的数据的请求方式
同步和异步是指访问数据的一种机制

1.2.2 BIO(同步并阻塞)

Java BIO就是传统的 socket编程.
BIO(blocking I/O) : 同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器)。
工作机制

生活中的例子:

BIO问题分析

  1. 每个请求都需要创建独立的线程,与对应的客户端进行数据 Read,业务处理,数据 Write
  2. 并发数较大时,需要创建大量线程来处理连接,系统资源占用较大
  3. 连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在 Read 操作上,造成线程资源浪费

1.2.3 NIO(同步非阻塞)

同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有 I/O 请求就进行处理

生活中的例子:

1.2.4 AIO(异步非阻塞)

AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用
Proactor 模式是一个消息异步通知的设计模式,Proactor 通知的不是就绪事件,而是操作完成事件,这也就是操作系统异步 IO 的主要模型。
生活中的例子:

1.2.5 BIO、NIO、AIO 适用场景分析

  1. BIO(同步并阻塞) 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序简单易理解
  2. NIO(同步非阻塞) 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4 开始支持
  3. AIO(异步非阻塞) 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并发操作, 编程比较复杂,JDK7 开始支持。

2.NIO编程

2.1 NIO介绍

Java NIO 全称java non-blocking IO ,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 New IO),是同步非阻塞的.

  1. NIO 有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)
  2. NIO是 面向缓冲区编程的。数据读取到一个缓冲区中,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络
  3. Java NIO 的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入, 这个线程同时可以去做别的事情。通俗理解:NIO 是可以做到用一个线程来处理多个操作的。假设有 10000 个请求过来,根据实际情况,可以分配50 或者 100 个线程来处理。不像之前的阻塞 IO 那样,非得分配 10000 个

2.2 NIO和 BIO的比较

  1. BIO 以流的方式处理数据,而 NIO 以缓冲区的方式处理数据,缓冲区 I/O 的效率比流 I/O 高很多
  2. BIO 是阻塞的,NIO则是非阻塞的
  3. BIO 基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求, 数据到达等),因此使用单个线程就可以监听多个客户端通道

2.3 NIO 三大核心原理示意图

一张图描述 NIO 的 Selector 、 Channel 和 Buffer 的关系

  1. 每个 channel 都会对应一个 Buffer
  2. Selector 对应一个线程, 一个线程对应多个 channel(连接)
  3. 每个 channel 都注册到 Selector选择器上
  4. Selector不断轮询查看Channel上的事件, 事件是通道Channel非常重要的概念
  5. Selector 会根据不同的事件,完成不同的处理操作
  6. Buffer 就是一个内存块 , 底层是有一个数组
  7. 数据的读取写入是通过 Buffer, 这个和 BIO , BIO 中要么是输入流,或者是输出流, 不能双向,但是NIO 的 Buffer 是可以读也可以写 , channel 是双向的.

2.4 缓冲区(Buffer)

2.4.1 基本介绍

缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个数组,该对象提供了一组方法,可以更轻松地使用内存块,,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel 提供从网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer.

2.4.2 Buffer常用API介绍

  1. Buffer 类及其子类
    在 NIO 中,Buffer是一个顶层父类,它是一个抽象类, 类的层级关系图,常用的缓冲区分别对应byte,short, int, long,float,double,char 7种.
  2. 缓冲区对象创建
方法名 说明
static ByteBuffer allocate(长度) 创建byte类型的指定长度的缓冲区
static ByteBuffer wrap(byte[] array) 创建一个有内容的byte类型缓冲区

示例代码:

import java.nio.ByteBuffer;
/**
* 创建缓冲区
*/
public class CreateBufferDemo {
public static void main(String[] args) {
//1.创建一个指定长度的缓冲区, 以ByteBuffer为例
ByteBuffer byteBuffer = ByteBuffer.allocate(5);
for (int i = 0; i < 5; i++) {
System.out.println(byteBuffer.get());
}
//在此调用会报错--后续再读缓冲区时着重讲解
//System.out.println(byteBuffer.get());
//2.创建一个有内容的缓冲区
ByteBuffer wrap = ByteBuffer.wrap("lagou".getBytes());
for (int i = 0; i < 5; i++) {
System.out.println(wrap.get());
}
}
}
  1. 缓冲区对象添加数据
方法名 说明
int position()/position(int newPosition) 获得当前要操作的索引/修改当前要操作的索引位置
int limit()/limit(int newLimit) 最多能操作到哪个索引/修改最多能操作的索引位置
int capacity() 返回缓冲区的总长度
int remaining()/boolean hasRemaining() 还有多少能操作索引个数/是否还有能操作
put(byte b)/put(byte[] src) 添加一个字节/添加字节数组

图解:

示例代码:

import java.nio.ByteBuffer;
/**
* 添加缓冲区
*/
public class PutBufferDemo {
public static void main(String[] args) {
//1.创建一个指定长度的缓冲区, 以ByteBuffer为例
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
System.out.println(byteBuffer.position());//0 获取当前索引所在位置
System.out.println(byteBuffer.limit());//10 最多能操作到哪个索引
System.out.println(byteBuffer.capacity());//10 返回缓冲区总长度
System.out.println(byteBuffer.remaining());//10 还有多少个能操作
//修改当前索引位置
//byteBuffer.position(1);
//修改最多能操作到哪个索引位置
//byteBuffer.limit(9);
//System.out.println(byteBuffer.position());//1 获取当前索引所在位置
//System.out.println(byteBuffer.limit());//9 最多能操作到哪个索引
//System.out.println(byteBuffer.capacity());//10 返回缓冲区总长度
//System.out.println(byteBuffer.remaining());//8 还有多少个能操作
//添加一个字节
byteBuffer.put((byte) 97);
System.out.println(byteBuffer.position());//1 获取当前索引所在位置
System.out.println(byteBuffer.limit());//10 最多能操作到哪个索引
System.out.println(byteBuffer.capacity());//10 返回缓冲区总长度
System.out.println(byteBuffer.remaining());//9 还有多少个能操作
//添加一个字节数组
byteBuffer.put("abc".getBytes());
System.out.println(byteBuffer.position());//4 获取当前索引所在位置
System.out.println(byteBuffer.limit());//10 最多能操作到哪个索引
System.out.println(byteBuffer.capacity());//10 返回缓冲区总长度
System.out.println(byteBuffer.remaining());//6 还有多少个能操作
//当添加超过缓冲区的长度时会报错
byteBuffer.put("012345".getBytes());
System.out.println(byteBuffer.position());//10 获取当前索引所在位置
System.out.println(byteBuffer.limit());//10 最多能操作到哪个索引
System.out.println(byteBuffer.capacity());//10 返回缓冲区总长度
System.out.println(byteBuffer.remaining());//0 还有多少个能操作
System.out.println(byteBuffer.hasRemaining());// false 是否还能有操作的
数组
// 如果缓存区存满后, 可以调整position位置可以重复写,这样会覆盖之前存入索引的对
应的值
byteBuffer.position(0);
byteBuffer.put("012345".getBytes());
}
}
  1. 缓冲区对象读取数据
方法名 介绍
flip() 写切换读模式 limit设置position位置, position设置0
get() 读一个字节
get(byte[] dst) 读多个字节
get(int index) 读指定索引的字节
rewind() 将position设置为0,可以重复读
clear() 切换写模式 position设置为0 , limit 设置为 capacity
array() 将缓冲区转换成字节数组返回

图解:flip()方法

图解:clear()方法

实例代码:

import java.nio.ByteBuffer;
/**
* 从缓冲区中读取数据
*/
public class GetBufferDemo {
public static void main(String[] args) {
//1.创建一个指定长度的缓冲区
ByteBuffer allocate = ByteBuffer.allocate(10);
allocate.put("0123".getBytes());
System.out.println("position:" + allocate.position());//4
System.out.println("limit:" + allocate.limit());//10
System.out.println("capacity:" + allocate.capacity());//10
System.out.println("remaining:" + allocate.remaining());//6
//切换读模式
System.out.println("读取数据--------------");
allocate.flip();
System.out.println("position:" + allocate.position());//4
System.out.println("limit:" + allocate.limit());//10
System.out.println("capacity:" + allocate.capacity());//10
System.out.println("remaining:" + allocate.remaining());//6
for (int i = 0; i < allocate.limit(); i++) {
System.out.println(allocate.get());
}
//读取完毕后.继续读取会报错,超过limit值
//System.out.println(allocate.get());
//读取指定索引字节
System.out.println("读取指定索引字节--------------");
System.out.println(allocate.get(1));
System.out.println("读取多个字节--------------");
// 重复读取
allocate.rewind();
byte[] bytes = new byte[4];
allocate.get(bytes);
System.out.println(new String(bytes));
// 将缓冲区转化字节数组返回
System.out.println("将缓冲区转化字节数组返回--------------");
byte[] array = allocate.array();
System.out.println(new String(array));
// 切换写模式,覆盖之前索引所在位置的值
System.out.println("写模式--------------");
allocate.clear();
allocate.put("abc".getBytes());
System.out.println(new String(allocate.array()));
}
}

注意事项:

  1. capacity:容量(长度)limit: 界限(最多能读/写到哪里)posotion:位置(读/写哪个索引)
  2. 获取缓冲区里面数据之前,需要调用flip方法
  3. 再次写数据之前,需要调用clear方法,但是数据还未消失,等再次写入数据,被覆盖了才会消失。

2.5 通道(Channel)

2.5.1 基本介绍

通常来说NIO中的所有IO都是从 Channel(通道) 开始的。NIO 的通道类似于流,但有些区别如下:

  1. 通道可以读也可以写,流一般来说是单向的(只能读或者写,所以之前我们用流进行IO操作的时候需要分别创建一个输入流和一个输出流)
  2. 通道可以异步读写
  3. 通道总是基于缓冲区Buffer来读写

2.5.2 Channel常用类介绍

  1. Channel接口
    常 用 的Channel实现类类 有 :FileChannel , DatagramChannel ,ServerSocketChannel和SocketChannel 。FileChannel 用于文件的数据读写, DatagramChannel 用于 UDP 的数据读写, ServerSocketChannel 和SocketChannel 用于 TCP 的数据读写。【ServerSocketChanne类似 ServerSocket , SocketChannel 类似 Socket】

  1. SocketChannel 与ServerSocketChannel
    类似 Socke和ServerSocket,可以完成客户端与服务端数据的通信工作.

2.5.3 ServerSocketChannel

服务端实现步骤:
1. 打开一个服务端通道
2. 绑定对应的端口号
3. 通道默认是阻塞的,需要设置为非阻塞
4. 检查是否有客户端连接 有客户端连接会返回对应的通道
5. 获取客户端传递过来的数据,并把数据放在byteBuffer这个缓冲区中
6. 给客户端回写数据
7. 释放资源
代码实现:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
/**
* 服务端
*/
public class NIOServer {
public static void main(String[] args) throws IOException,
InterruptedException {
//1. 打开一个服务端通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//2. 绑定对应的端口号
serverSocketChannel.bind(new InetSocketAddress(9999));
//3. 通道默认是阻塞的,需要设置为非阻塞
// true 为通道阻塞 false 为非阻塞
serverSocketChannel.configureBlocking(false);
System.out.println("服务端启动成功..........");
while (true) {
//4. 检查是否有客户端连接 有客户端连接会返回对应的通道 , 否则返回null
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel == null) {
System.out.println("没有客户端连接...我去做别的事情");
Thread.sleep(2000);
continue;
}
//5. 获取客户端传递过来的数据,并把数据放在byteBuffer这个缓冲区中
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//返回值:
//正数: 表示本次读到的有效字节个数.
//0 : 表示本次没有读到有效字节.
//-1 : 表示读到了末尾
int read = socketChannel.read(byteBuffer);
System.out.println("客户端消息:" +
new String(byteBuffer.array(), 0, read,
StandardCharsets.UTF_8));
//6. 给客户端回写数据
socketChannel.write(ByteBuffer.wrap("没
钱".getBytes(StandardCharsets.UTF_8)));
//7. 释放资源
socketChannel.close();
}
}
}

2.5.4 SocketChannel

实现步骤

  1. 打开通道
  2. 设置连接IP和端口号
  3. 写出数据
  4. 读取服务器写回的数据
  5. 释放资源
    代码实现:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
/**
* 客户端
*/
public class NIOClient {
public static void main(String[] args) throws IOException {
//1.打开通道
SocketChannel socketChannel = SocketChannel.open();
//2.设置连接IP和端口号
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
//3.写出数据
socketChannel.write(ByteBuffer.wrap("老板, 该还钱
拉!".getBytes(StandardCharsets.UTF_8)));
//4.读取服务器写回的数据
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int read=socketChannel.read(readBuffer);
System.out.println("服务端消息:" + new String(readBuffer.array(), 0, read,
StandardCharsets.UTF_8));
//5.释放资源
socketChannel.close();
}
}

2.6 Selector (选择器)

2.6.1 基本介绍

可以用一个线程,处理多个的客户端连接,就会使用到NIO的Selector(选择器). Selector 能够检测多个注册的服务端通道上是否有事件发生,如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。


在这种没有选择器的情况下,对应每个连接对应一个处理线程. 但是连接并不能马上就会发送信息,所以还会产生资源浪费

只有在通道真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程, 避免了多线程之间的上下文切换导致的开销

2.6.2 常用API介绍

  1. Selector 类是一个抽象类

    常用方法:
    Selector.open() : //得到一个选择器对象
    selector.select() : //阻塞 监控所有注册的通道,当有对应的事件操作时, 会将SelectionKey放入集合内部并返回事件数量
    selector.select(1000): //阻塞 1000 毫秒,监控所有注册的通道,当有对应的事件操作时, 会将SelectionKey放入集合内部并返回
    selector.selectedKeys() : // 返回存有SelectionKey的集合
  2. SelectionKey

    常用方法
    SelectionKey.isAcceptable(): 是否是连接继续事件
    SelectionKey.isConnectable(): 是否是连接就绪事件
    SelectionKey.isReadable(): 是否是读就绪事件
    SelectionKey.isWritable(): 是否是写就绪事件

SelectionKey中定义的4种事件:
SelectionKey.OP_ACCEPT —— 接收连接继续事件,表示服务器监听到了客户连接,服务器可以接收这个连接了
SelectionKey.OP_CONNECT —— 连接就绪事件,表示客户端与服务器的连接已经建立成功
SelectionKey.OP_READ —— 读就绪事件,表示通道中已经有了可读的数据,可以执行读操作了(通道目前有数据,可以进行读操作了)
SelectionKey.OP_WRITE —— 写就绪事件,表示已经可以向通道写数据了(通道目前可以用于写操作)

2.6.3 Selector 编码

服务端实现步骤:

  1. 打开一个服务端通道
  2. 绑定对应的端口号
  3. 通道默认是阻塞的,需要设置为非阻塞
  4. 创建选择器
  5. 将服务端通道注册到选择器上,并指定注册监听的事件为OP_ACCEPT
  6. 检查选择器是否有事件
  7. 获取事件集合
  8. 判断事件是否是客户端连接事件SelectionKey.isAcceptable()
  9. 得到客户端通道,并将通道注册到选择器上, 并指定监听事件为OP_READ
  10. 判断是否是客户端读就绪事件SelectionKey.isReadable()
  11. 得到客户端通道,读取数据到缓冲区
  12. 给客户端回写数据
  13. 从集合中删除对应的事件, 因为防止二次处理.
    代码实现:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
/**
* 服务端-选择器
*/
public class NIOSelectorServer {
public static void main(String[] args) throws IOException,
InterruptedException {
//1. 打开一个服务端通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//2. 绑定对应的端口号
serverSocketChannel.bind(new InetSocketAddress(9999));
//3. 通道默认是阻塞的,需要设置为非阻塞
serverSocketChannel.configureBlocking(false);
//4. 创建选择器
Selector selector = Selector.open();
//5. 将服务端通道注册到选择器上,并指定注册监听的事件为OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务端启动成功...");
while (true) {
//6. 检查选择器是否有事件
int select = selector.select(2000);
if (select == 0) {
continue;
}
//7. 获取事件集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
//8. 判断事件是否是客户端连接事件SelectionKey.isAcceptable()
SelectionKey key = iterator.next();
//9. 得到客户端通道,并将通道注册到选择器上, 并指定监听事件为OP_READ
if (key.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端已连接......" + socketChannel);
//必须设置通道为非阻塞, 因为selector需要轮询监听每个通道的事件
socketChannel.configureBlocking(false);
//并指定监听事件为OP_READ
socketChannel.register(selector, SelectionKey.OP_READ);
}
//10. 判断是否是客户端读就绪事件SelectionKey.isReadable()
if (key.isReadable()) {
//11.得到客户端通道,读取数据到缓冲区
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(byteBuffer);
if (read > 0) {
System.out.println("客户端消息:" +
new String(byteBuffer.array(), 0, read,
StandardCharsets.UTF_8));
//12.给客户端回写数据
socketChannel.write(ByteBuffer.wrap("没
钱".getBytes(StandardCharsets.UTF_8)));
socketChannel.close();
}
}
//13.从集合中删除对应的事件, 因为防止二次处理.
iterator.remove();
}
}
}
}

3.Netty核心原理

3.1 Netty 介绍

3.1.1 原生 NIO 存在的问题

  1. NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
  2. 需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。
  3. 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。
  4. JDK NIO 的 Bug:臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。直到JDK 1.7版本该问题仍旧存在,没有被根本解决
    在NIO中通过Selector的轮询当前是否有IO事件,根据JDK NIO api描述,Selector的select方法会一直阻塞,直到IO事件达到或超时,但是在Linux平台上这里有时会出现问题,在某些场景下select方法会直接返回,即使没有超时并且也没有IO事件到达,这就是著名的epoll bug,这是一个比较严重的bug,它会导致线程陷入死循环,会让CPU飙到100%,极大地影响系统的可靠性,到目前为止,JDK都没有完全解决这个问题。

3.1.2 概述

Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序。 Netty 是一个基于 NIO 的网络编程框架,使用Netty 可以帮助你快速、简单的开发出一 个网络应用,相当于简化和流程化了 NIO 的开发过程。 作为当前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、 通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。

从图中就能看出 Netty 的强大之处:零拷贝、可拓展事件模型;支持 TCP、UDP、HTTP、WebSocket 等协议;提供安全传输、压缩、大文件传输、编解码支持等等。
具备如下优点:

  1. 设计优雅,提供阻塞和非阻塞的 Socket;提供灵活可拓展的事件模型;提供高度可定制的线程模
    型。
  2. 具备更高的性能和更大的吞吐量,使用零拷贝技术最小化不必要的内存复制,减少资源的消耗。
  3. 提供安全传输特性。
  4. 支持多种主流协议;预置多种编解码功能,支持用户开发私有协议。

3.2 线程模型

3.2.1 线程模型基本介绍

不同的线程模式,对程序的性能有很大影响,在学习Netty线程模式之前,首先讲解下 各个线程模式, 最后看看 Netty 线程模型有什么优越性.目前存在的线程模型有:
1传统阻塞 I/O 服务模型
2Reactor 模型
根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现
单 Reactor 单线程
单 Reactor 多线程
主从 Reactor 多线程

3.2.2 传统阻塞 I/O 服务模型

采用阻塞 IO 模式获取输入的数据, 每个连接都需要独立的线程完成数据的输入 , 业务处理和数据返回工作.

存在问题:

  1. 当并发数很大,就会创建大量的线程,占用很大系统资源
  2. 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在 read 操作,造成线程资源浪费

3.2.3 Reactor 模型

Reactor 模式,通过一个或多个输入同时传递给服务处理器的模式 , 服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程, 因此 Reactor 模式也叫 Dispatcher模式. Reactor 模式使用IO 复用监听事件, 收到事件后,分发给某个线程(进程), 这点就是网络服务器高并发处理关键.

  1. 单 Reactor 单线程
    Selector是可以实现应用程序通过一个阻塞对象监听多路连接请求
    Reactor 对象通过 Selector监控客户端请求事件,收到事件后通过 Dispatch 进行分发是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对象处理连接完成后的后续业务处理
    Handler 会完成 Read→业务处理→Send 的完整业务流程
    优点:
    优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
    缺点:

    1. 性能问题: 只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某个连接上的业务时,
      整个进程无法处理其他连接事件,很容易导致性能瓶颈
    2. 可靠性问题: 线程意外终止或者进入死循环,会导致整个系统通信模块不可用,不能接收和处
      理外部消息,造成节点故障
  2. 单 Reactor多线程

    Reactor 对象通过 selector 监控客户端请求事件, 收到事件后,通过 dispatch 进行分发
    如果建立连接请求, 则右 Acceptor 通过accept 处理连接请求
    如果不是连接请求,则由 reactor 分发调用连接对应的 handler 来处理
    handler 只负责响应事件,不做具体的业务处理, 通过 read 读取数据后,会分发给后面的worker 线程池的某个线程处理业务
    worker 线程池会分配独立线程完成真正的业务,并将结果返回给 handler
    handler 收到响应后,通过 send 将结果返回给 client

优点:
可以充分的利用多核 cpu 的处理能力
缺点:
多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,在单线程运行, 在高并发场景容易出现性能瓶颈

  1. 主从 Reactor 多线程
  • Reactor 主线程 MainReactor 对象通过 select 监听客户端连接事件,收到事件后,通过Acceptor 处理客户端连接事件
  • 当 Acceptor 处理完客户端连接事件之后(与客户端建立好 Socket 连接),MainReactor 将连接分配给 SubReactor。(即:MainReactor 只负责监听客户端连接请求,和客户端建立连接之后将连接交由 SubReactor 监听后面的 IO 事件。)
  • SubReactor 将连接加入到自己的连接队列进行监听,并创建 Handler 对各种事件进行处理
  • 当连接上有新事件发生的时候,SubReactor 就会调用对应的 Handler 处理
  • Handler 通过 read 从连接上读取请求数据,将请求数据分发给 Worker 线程池进行业务处理
  • Worker 线程池会分配独立线程来完成真正的业务处理,并将处理结果返回给 Handler。Handler 通过 send 向客户端发送响应数据
  • 一个 MainReactor 可以对应多个 SubReactor,即一个 MainReactor 线程可以对应多个SubReactor 线程
    优点:
  1. MainReactor 线程与 SubReactor 线程的数据交互简单职责明确,MainReactor 线程只需要接收新连接,SubReactor 线程完成后续的业务处理
  2. MainReactor 线程与 SubReactor 线程的数据交互简单, MainReactor 线程只需要把新连接传给 SubReactor 线程,SubReactor 线程无需返回数据
  3. 多个 SubReactor 线程能够应对更高的并发请求
    缺点:
    这种模式的缺点是编程复杂度较高。但是由于其优点明显,在许多项目中被广泛使用,包括Nginx、Memcached、Netty 等。这种模式也被叫做服务器的 1+M+N 线程模式,即使用该模式开发的服务器包含一个(或多个,1 只是表示相对较少)连接建立线程+M 个 IO 线程+N 个业务处理线程。这是业界成熟的服务器程序设计模式。

3.2.4 Netty线程模型

Netty 的设计主要基于主从 Reactor 多线程模式,并做了一定的改进。

  1. 简单版Netty模型
  • BossGroup 线程维护 Selector,ServerSocketChannel 注册到这个 Selector 上,只关注连接建立请求事件(主 Reactor)
  • 当接收到来自客户端的连接建立请求事件的时候,通过 ServerSocketChannel.accept 方法获得对应的 SocketChannel,并封装成 NioSocketChannel 注册到 WorkerGroup 线程中的Selector,每个 Selector 运行在一个线程中(从 Reactor)
  • 当 WorkerGroup 线程中的 Selector 监听到自己感兴趣的 IO 事件后,就调用 Handler 进行处理
  1. 进阶版Netty模型
  • 有两组线程池:BossGroup 和 WorkerGroup,BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup 中的线程专门负责处理连接上的读写
  • BossGroup 和 WorkerGroup 含有多个不断循环的执行事件处理的线程,每个线程都包含一个 Selector,用于监听注册在其上的 Channel
  • 每个 BossGroup 中的线程循环执行以下三个步骤
    1轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
    2处理 accept 事件,与客户端建立连接,生成一个 NioSocketChannel,并将其注册到WorkerGroup 中某个线程上的 Selector 上
    3再去以此循环处理任务队列中的下一个事件
    每个 WorkerGroup 中的线程循环执行以下三个步骤
    1轮训注册在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)
    2在对应的 NioSocketChannel 上处理 read/write 事件
    3再去以此循环处理任务队列中的下一个事件
  1. 详细版Netty模型
  • Netty 抽象出两组线程池:BossGroup 和 WorkerGroup,也可以叫做BossNioEventLoopGroup 和 WorkerNioEventLoopGroup。每个线程池中都有NioEventLoop 线程。BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup 中的线程专门负责处理连接上的读写。BossGroup 和 WorkerGroup 的类型都是NioEventLoopGroup
  • NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环就 是一个 NioEventLoop
  • NioEventLoop 表示一个不断循环的执行事件处理的线程,每个 NioEventLoop 都包含一个Selector,用于监听注册在其上的 Socket 网络连接(Channel)
  • NioEventLoopGroup 可以含有多个线程,即可以含有多个 NioEventLoop
  • 每个 BossNioEventLoop 中循环执行以下三个步骤
    select:轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
    processSelectedKeys:处理 accept 事件,与客户端建立连接,生成一个NioSocketChannel,并将其注册到某个 WorkerNioEventLoop 上的 Selector 上
    runAllTasks:再去以此循环处理任务队列中的其他任务
    每个 WorkerNioEventLoop 中循环执行以下三个步骤
    select:轮训注册在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)
    processSelectedKeys:在对应的 NioSocketChannel 上处理 read/write 事件
    runAllTasks:再去以此循环处理任务队列中的其他任务
  • 在以上两个processSelectedKeys步骤中,会使用 Pipeline(管道),Pipeline 中引用了Channel,即通过 Pipeline 可以获取到对应的 Channel,Pipeline 中维护了很多的处理器(拦截处理器、过滤处理器、自定义处理器等)。

3.3 核心API介绍

3.3.1 ChannelHandler及其实现类

ChannelHandler 接口定义了许多事件处理的方法,我们可以通过重写这些方法去实现具 体的业务逻辑。API 关系如下图所示

Netty开发中需要自定义一个 Handler 类去实现 ChannelHandle接口或其子接口或其实现类,然后通过重写相应方法实现业务逻辑,我们接下来看看一般都需要重写哪些方法
public void channelActive(ChannelHandlerContext ctx),通道就绪事件
public void channelRead(ChannelHandlerContext ctx, Object msg),通道读取数据事件
public void channelReadComplete(ChannelHandlerContext ctx) ,数据读取完毕事件
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause),通道发生异常事件

3.3.2 ChannelPipeline

ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于一个贯穿 Netty 的责任链.

如果客户端和服务器的Handler是一样的,消息从客户端到服务端或者反过来,每个Inbound类型或Outbound类型的Handler只会经过一次,混合类型的Handler(实现了Inbound和Outbound的Handler)会经过两次。准确的说ChannelPipeline中是一个ChannelHandlerContext,每个上下文对象中有ChannelHandler. InboundHandler是按照Pipleline的加载顺序的顺序执行, OutboundHandler是按照Pipeline的加载顺序,逆序执行

3.3.3 ChannelHandlerContext

这 是 事 件 处 理 器 上 下 文 对 象 , Pipeline 链 中 的 实 际 处 理 节 点 。 每 个 处 理 节 点ChannelHandlerContext 中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler ,同时ChannelHandlerContext 中也绑定了对应的 ChannelPipeline和 Channel 的信息,方便对
ChannelHandler 进行调用。常用方法如下所示:
ChannelFuture close(),关闭通道
ChannelOutboundInvoker flush(),刷新
ChannelFuture writeAndFlush(Object msg) , 将 数 据 写 到 ChannelPipeline 中 当 前
ChannelHandler 的下一个 ChannelHandler 开始处理(出站)

3.3.4 ChannelOption

Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。ChannelOption 是 Socket 的标准参数,而非 Netty 独创的。常用的参数配置有:

  • ChannelOption.SO_BACKLOG
    对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户 端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定 了队列的大小。
  • ChannelOption.SO_KEEPALIVE ,一直保持连接活动状态。该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。

3.3.5 ChannelFuture

表示 Channel 中异步 I/O 操作的结果,在 Netty 中所有的 I/O 操作都是异步的,I/O 的调用会直接返回,调用者并不能立刻获得结果,但是可以通过 ChannelFuture 来获取 I/O 操作 的处理状态。
常用方法如下所示:
Channel channel(),返回当前正在进行 IO 操作的通道
ChannelFuture sync(),等待异步操作执行完毕,将异步改为同步

3.3.6 EventLoopGroup和实现类NioEventLoopGroup

EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般 会有多个EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。
EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。 通常一个服务端口即一个 ServerSocketChannel对应一个Selector 和一个EventLoop线程。 BossEventLoop 负责接收客户端的连接并将SocketChannel 交给 WorkerEventLoopGroup 来进 行 IO 处理,如下图所示:

BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了ServerSocketChannel 的 Selector 实例,BossEventLoop 不断轮询 Selector 将连接事件分离出来, 通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup,WorkerEventLoopGroup 会由 next 选择其中一个 EventLoopGroup 来将这个 SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行处理。一般情况下我们都是用实现类NioEventLoopGroup.
常用方法如下所示:
public NioEventLoopGroup(),构造方法,创建线程组
public Future<?> shutdownGracefully(),断开连接,关闭线程

3.3.7 ServerBootstrap和Bootstrap

ServerBootstrap 是 Netty 中的服务器端启动助手,通过它可以完成服务器端的各种配置;
Bootstrap 是 Netty 中的客户端启动助手,通过它可以完成客户端的各种配置。常用方法如下 所示:

  • public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup), 该方法用于服务器端,用来设置两个 EventLoop
  • public B group(EventLoopGroup group) ,该方法用于客户端,用来设置一个 EventLoop
  • public B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道 实现
  • public B option(ChannelOption option, T value),用来给 ServerChannel 添加配置
  • public ServerBootstrap childOption(ChannelOption childOption, T value),用来给接收到的通道添加配置
  • public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务 处理类(自定义的 handler)
  • public ChannelFuture bind(int inetPort) ,该方法用于服务器端,用来设置占用的端口号
  • public ChannelFuture connect(String inetHost, int inetPort) ,该方法用于客户端,用来连 接服务器端

3.3.8 Unpooled类

这是 Netty 提供的一个专门用来操作缓冲区的工具类,常用方法如下所示:
public static ByteBuf copiedBuffer(CharSequence string, Charset charset),通过给定的数据 和字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 对象)
3.4 Netty入门案例
Netty 是由 JBOSS 提供的一个 Java 开源框架,所以在使用得时候首先得导入Netty的maven坐标

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>

3.4.1 Netty服务端编写

服务端实现步骤:

  1. 创建bossGroup线程组: 处理网络事件–连接事件
  2. 创建workerGroup线程组: 处理网络事件–读写事件
  3. 创建服务端启动助手
  4. 设置bossGroup线程组和workerGroup线程组
  5. 设置服务端通道实现为NIO
  6. 参数设置
  7. 创建一个通道初始化对象
  8. 向pipeline中添加自定义业务处理handler
  9. 启动服务端并绑定端口,同时将异步改为同步
  10. 关闭通道和关闭连接池
    代码实现:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Netty服务端
*/
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//1.创建bossGroup线程组: 处理网络事件--连接事件 线程数默认为: 2 * 处理器线程数
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//2.创建workerGroup线程组: 处理网络事件--读写事件 2 * 处理器线程数
EventLoopGroup workerGroup = new NioEventLoopGroup();
//3.创建服务端启动助手
ServerBootstrap bootstrap = new ServerBootstrap();
//4.设置线程组
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)//5.设置服务端通道实现;
.option(ChannelOption.SO_BACKLOG, 128)//6.参数设置-设置线程队列中等待
连接个数
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)//7.参数设
置-设置活跃状态,child是设置workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() {//8.创建一
个通道初始化对象
@Override
protected void initChannel(SocketChannel ch) throws
Exception {
//9.向pipeline中添加自定义业务处理handler
ch.pipeline().addLast(new NettyServerHandle());
}
});
//10.启动服务端并绑定端口,同时将异步改为同步
ChannelFuture future = bootstrap.bind(9999).sync();
System.out.println("服务器启动成功....");
//11.关闭通道(并不是真正意义上的关闭,而是监听通道关闭状态)和关闭连接池
future.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

自定义服务端handle

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class NettyServerHandle implements ChannelInboundHandler {
/**
* 通道读取事件
*
* @param ctx 通道上下文对象
* @param msg 消息
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("客户端发来消息:" +
byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 读取完毕事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
{
ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是Netty服务端.",
CharsetUtil.UTF_8));
}
/**
* 异常发生事件
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception
{
}
/**
* 通道就绪事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws
Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws
Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
}

3.4.2 Netty客户端编写

客户端实现步骤:

  1. 创建线程组
  2. 创建客户端启动助手
  3. 设置线程组
  4. 设置客户端通道实现为NIO
  5. 创建一个通道初始化对象
  6. 向pipeline中添加自定义业务处理handler
  7. 启动客户端,等待连接服务端,同时将异步改为同步
  8. 关闭通道和关闭连接池
    代码实现:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* Netty客户端
*/
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//1. 创建线程组
EventLoopGroup group = new NioEventLoopGroup();
//2. 创建客户端启动助手
Bootstrap bootstrap = new Bootstrap();
//3. 设置线程组
bootstrap.group(group)
.channel(NioSocketChannel.class)//4. 设置服务端通道实现为NIO
.handler(new ChannelInitializer<SocketChannel>() { //5. 创建一个通
道初始化对象
@Override
protected void initChannel(SocketChannel ch) throws
Exception {
//6. 向pipeline中添加自定义业务处理handler
ch.pipeline().addLast(new NettyClientHandle());
}
});
//7. 启动客户端, 等待连接服务端, 同时将异步改为同步
ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
//8. 关闭通道和关闭连接池
future.channel().closeFuture().sync();
group.shutdownGracefully();
}
}

自定义客户端handle

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* 客户端处理类
*/
public class NettyClientHandle implements ChannelInboundHandler {
/**
* 通道就绪事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端",
CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服务端发来消息:" +
byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
}
@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception
{
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
{
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws
Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws
Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
}

3.5 Netty异步模型

3.5.1 基本介绍

异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。

Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个ChannelFuture。调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得IO 操作结果. Netty 的异步模型是建立在 future 和 callback 的之上的。callback 就是回调。重点说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun 返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future 去监控方法 fun 的处理过程(即 : Future-Listener 机制)

3.5.2 Future 和Future-Listener

  1. Future
    表示异步的执行结果, 可以通过它提供的方法来检测执行是否完成,ChannelFuture 是他的一个子接口. ChannelFuture 是一个接口 ,可以添加监听器,当监听的事件发生时,就会通知到监听器
    当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态, 注册监听函数来执行完成后的操作。
    常用方法有:
  • sync 方法, 阻塞等待程序结果反回
  • isDone 方法来判断当前操作是否完成;
  • isSuccess 方法来判断已完成的当前操作是否成功;
  • getCause 方法来获取已完成的当前操作失败的原因;
  • isCancelled 方法来判断已完成的当前操作是否被取消;
  • addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果Future 对象已完成,则通知指定的监听器
  1. Future-Listener 机制
    给Future添加监听器,监听操作结果
    代码实现:
ChannelFuture future = bootstrap.bind(9999);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("端口绑定成功!");
} else {
System.out.println("端口绑定失败!");
}
}
});
ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("你好
呀,我是Netty客户端", CharsetUtil.UTF_8));
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("数据发送成功.");
} else {
System.out.println("数据发送失败.");
}
}
});

4.Netty高级应用

4.1 Netty编解码器

4.1.1 Java的编解码

  1. 编码(Encode)称为序列化, 它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途。
  2. 解码(Decode)称为反序列化,它把从网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作。

    java序列化对象只需要实现java.io.Serializable接口并生成序列化ID,这个类就能够通过java.io.ObjectInput和java.io.ObjectOutput序列化和反序列化。
    Java序列化目的:1.网络传输。2.对象持久化。
    Java序列化缺点:1.无法跨语言。 2.序列化后码流太大。3.序列化性能太低。
    Java序列化仅仅是Java编解码技术的一种,由于它的种种缺陷,衍生出了多种编解码技术和框架,这些编解码框架实现消息的高效序列化。

4.1.2 Netty编解码器

  1. 概念
    在网络应用中需要实现某种编解码器,将原始字节数据与自定义的消息对象进行互相转换。网络中都是以字节码的数据形式来传输数据的,服务器编码数据后发送到客户端,客户端需要对数据进行解码。
    对于Netty而言,编解码器由两部分组成:编码器、解码器。
    解码器:负责将消息从字节或其他序列形式转成指定的消息对象。
    编码器:将消息对象转成字节或其他序列形式在网络上传输。
    Netty 的编(解)码器实现了 ChannelHandlerAdapter,也是一种特殊的 ChannelHandler,所以依赖于 ChannelPipeline,可以将多个编(解)码器链接在一起,以实现复杂的转换逻辑。
    Netty里面的编解码: 解码器:负责处理“入站 InboundHandler”数据。 编码器:负责“出站OutboundHandler” 数据。
  2. 解码器(Decoder)
    解码器负责 解码“入站”数据从一种格式到另一种格式,解码器处理入站数据是抽象ChannelInboundHandler的实现。需要将解码器放在ChannelPipeline中。对于解码器,Netty中主要提供了抽象基类ByteToMessageDecoder和MessageToMessageDecoder

    抽象解码器
  • ByteToMessageDecoder: 用于将字节转为消息,需要检查缓冲区是否有足够的字节
  • ReplayingDecoder: 继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但是 ReplayingDecoder速度略慢于ByteToMessageDecoder,同时不是所有的ByteBuf都支持。项目复杂性高则使用ReplayingDecoder,否则使用ByteToMessageDecoder
  • MessageToMessageDecoder: 用于从一种消息解码为另外一种消息(例如POJO到POJO)
    核心方法:
decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)

代码实现:
解码器:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.CharsetUtil;
import java.util.List;
/*** 消息解码-可以将字符串消息进行在进行解码. 只有消息入站时才会进行解码 */
public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
System.out.println("正在进行消息解码");
out.add(in.toString(CharsetUtil.UTF_8));
}
}

通道读取方法:

/**
* 通道读取事件
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
System.out.println("客户端发送过来的消息:" + msg);
}

启动类:

protected void initChannel(SocketChannel ch) throws Exception {
//8. 向pipeline中添加自定义业务处理handler
ch.pipeline().addLast(new MessageDecoder());//添加解码器
ch.pipeline().addLast(new NettyServerHandler());
}
  1. 编码器(Encoder)
    与ByteToMessageDecoder和MessageToMessageDecoder相对应,Netty提供了对应的编码器实现MessageToByteEncoder和MessageToMessageEncoder,二者都实现ChannelOutboundHandler接口。

    抽象编码器
    MessageToByteEncoder: 将消息转化成字节
    MessageToMessageEncoder: 用于从一种消息编码为另外一种消息(例如POJO到POJO)
    核心方法:
    encode(ChannelHandlerContext ctx, String msg, List<Object> out)
    代码实现:
    编码器:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil;
import java.util.List;
/*** 编码器 */
public class MessageEncoder extends MessageToMessageEncoder<String> {
@Override
protected void encode(ChannelHandlerContext ctx, String msg,
List<Object> out) throws Exception {
System.out.println("消息进行消息编码");
out.add(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
}
}

消息发送:

/**
* 通道就绪事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ChannelFuture future = ctx.writeAndFlush("你好呀.我是Netty客户端");
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception
{
if (future.isSuccess()) {
System.out.println("数据发送成功!");
} else {
System.out.println("数据发送失败!");
}
}
});
}

启动类:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
//6. 向pipeline中添加自定义业务处理handler
ch.pipeline().addLast(new MessageDecoder());//添加解码器
ch.pipeline().addLast(new MessageEncoder());//添加编码器
ch.pipeline().addLast(new NettyClientHandler());
}
  1. 编码解码器Codec
    编码解码器: 同时具有编码与解码功能,特点同时实现了ChannelInboundHandler和ChannelOutboundHandler接口,因此在数据输入和输出时都能进行处理。

    Netty提供提供了一个ChannelDuplexHandler适配器类,编码解码器的抽象基类
    ByteToMessageCodec ,MessageToMessageCodec都继承与此类.
    代码实现:
/**
* 编解码器
*/
public class MessageCoder extends MessageToMessageCodec {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, List out)
throws Exception {
System.out.println("正在进行消息编码");
String str = (String) msg;
out.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8));
}
@Override
protected void decode(ChannelHandlerContext ctx, Object msg, List out)
throws Exception {
System.out.println("正在进行消息解码");
ByteBuf byteBuf = (ByteBuf) msg;
out.add(byteBuf.toString(CharsetUtil.UTF_8));
}
}

启动类:

protected void initChannel(SocketChannel ch) throws Exception {
//8. 向pipeline中添加自定义业务处理handler
ch.pipeline().addLast(new MessageCoder());//添加编解码器
ch.pipeline().addLast(new NettyServerHandler());
}

4.2 Netty案例-群聊天室

案例要求:

  1. 编写一个 Netty 群聊系统,实现服务器端和客户端之间的数据简单通讯
  2. 实现多人群聊
  3. 服务器端:可以监测用户上线,离线,并实现消息转发功能
  4. 客户端:可以发送消息给其它所有用户,同时可以接受其它用户发送的消息

4.2.1 聊天室服务端编写

  1. NettyChatServer
import com.lagou.demo.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* 聊天室服务端
*/
public class NettyChatServer {
//端口号
private int port;
public NettyChatServer(int port) {
this.port = port;
}
public void run() throws InterruptedException {
//1. 创建bossGroup线程组: 处理网络事件--连接事件
EventLoopGroup bossGroup = null;
//2. 创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数
EventLoopGroup workerGroup = null;
try {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
//3. 创建服务端启动助手
ServerBootstrap serverBootstrap = new ServerBootstrap();
//4. 设置bossGroup线程组和workerGroup线程组
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) //5. 设置服务端通道
实现为NIO
.option(ChannelOption.SO_BACKLOG, 128)//6. 参数设置
.childOption(ChannelOption.SO_KEEPALIVE,
Boolean.TRUE)//6. 参数设置
.childHandler(new ChannelInitializer<SocketChannel>() {
//7. 创建一个通道初始化对象
@Override
protected void initChannel(SocketChannel ch) throws
Exception {
//8. 向pipeline中添加自定义业务处理handler
//添加编解码器
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
// todo
ch.pipeline().addLast(new
NettyChatServerHandler());
}
});
//9. 启动服务端并绑定端口,同时将异步改为同步
ChannelFuture future = serverBootstrap.bind(port);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws
Exception {
if (future.isSuccess()) {
System.out.println("端口绑定成功!");
} else {
System.out.println("端口绑定失败!");
}
}
});
System.out.println("聊天室服务端启动成功.");
//10. 关闭通道(并不是真正意义上关闭,而是监听通道关闭的状态)和关闭连接池
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new NettyChatServer(9998).run();
}
}
  1. NettyChatServerHandle
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.ArrayList;
import java.util.List;
/**
* 聊天室业务处理类
*/
public class NettyChatServerHandler extends
SimpleChannelInboundHandler<String> {
public static List<Channel> channelList = new ArrayList<>();
/**
* 通道就绪事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//当有新的客户端连接的时候, 将通道放入集合
channelList.add(channel);
System.out.println("[Server]:" +
channel.remoteAddress().toString().substring(1) + "在线.");
}
/**
* 通道未就绪--channel下线
*
*@param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
{
Channel channel = ctx.channel();
//当有客户端断开连接的时候,就移除对应的通道
channelList.remove(channel);
System.out.println("[Server]:" +
channel.remoteAddress().toString().substring(1) + "下线.");
}
/**
* 通道读取事件
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg)
throws Exception {
//当前发送消息的通道, 当前发送的客户端连接
Channel channel = ctx.channel();
for (Channel channel1 : channelList) {
//排除自身通道
if (channel != channel1) {
channel1.writeAndFlush("[" +
channel.remoteAddress().toString().substring(1)
+ "]说:" + msg);
}
}
}
/**
* 异常处理事件
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
Channel channel = ctx.channel();
//移除集合
channelList.remove(channel);
System.out.println("[Server]:" +
channel.remoteAddress().toString().substring(1) + "异常.");
}
}

4.2.2 聊天室客户端编写

  1. NettyChatClient
import com.lagou.demo.NettyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
/**
* 聊天室的客户端
*/
public class NettyChatClient {
private String ip;//服务端IP
private int port;//服务端端口号
public NettyChatClient(String ip, int port) {
this.ip = ip;
this.port = port;
}
public void run() throws InterruptedException {
//1. 创建线程组
EventLoopGroup group = null;
try {
group = new NioEventLoopGroup();
//2. 创建客户端启动助手
Bootstrap bootstrap = new Bootstrap();
//3. 设置线程组
bootstrap.group(group)
.channel(NioSocketChannel.class)//4. 设置客户端通道实现为NIO
.handler(new ChannelInitializer<SocketChannel>() { //5.
创建一个通道初始化对象
@Override
protected void initChannel(SocketChannel ch) throws
Exception {
//6. 向pipeline中添加自定义业务处理handler
//添加编解码器
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
//添加客户端的处理类
ch.pipeline().addLast(new
NettyChatClientHandler());
}
});
//7. 启动客户端,等待连接服务端,同时将异步改为同步
ChannelFuture channelFuture = bootstrap.connect(ip,
port).sync();
Channel channel = channelFuture.channel();
System.out.println("-------" +
channel.localAddress().toString().substring(1) + "--------");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
//向服务端发送消息
channel.writeAndFlush(msg);
}
//8. 关闭通道和关闭连接池
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new NettyChatClient("127.0.0.1", 9998).run();
}
}
  1. NettyChatClientHandle
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 聊天室处理类
*/
public class NettyChatClientHandler extends
SimpleChannelInboundHandler<String> {
/**
* 通道读取就绪事件
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg)
throws Exception {
System.out.println(msg);
}
}

4.3 基于Netty的Http服务器开发

4.3.1 介绍

Netty的HTTP协议栈无论在性能还是可靠性上,都表现优异,非常适合在非Web容器的场景下应用,相比于传统的Tomcat、Jetty等Web容器,它更加轻量和小巧,灵活性和定制性也更好。

4.3.2 功能需求

  1. Netty 服务器在 8080 端口监听
  2. 浏览器发出请求 "http://localhost:8080/ "
  3. 服务器可以回复消息给客户端 "Hello! 我是Netty服务器 " ,并对特定请求资源进行过滤.

4.3.3 服务端代码实现

  1. NettyHttpServer
import com.lagou.chat.NettyChatServer;
import com.lagou.chat.NettyChatServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* 聊天室服务端
*/
public class NettyHttpServer {
//端口号
private int port;
public NettyHttpServer(int port) {
this.port = port;
}
public void run() throws InterruptedException {
//1. 创建bossGroup线程组: 处理网络事件--连接事件
EventLoopGroup bossGroup = null;
//2. 创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数
EventLoopGroup workerGroup = null;
try {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
//3. 创建服务端启动助手
ServerBootstrap serverBootstrap = new ServerBootstrap();
//4. 设置bossGroup线程组和workerGroup线程组
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) //5. 设置服务端通道
实现为NIO
.option(ChannelOption.SO_BACKLOG, 128)//6. 参数设置
.childOption(ChannelOption.SO_KEEPALIVE,
Boolean.TRUE)//6. 参数设置
.childHandler(new ChannelInitializer<SocketChannel>() {
//7. 创建一个通道初始化对象
@Override
protected void initChannel(SocketChannel ch) throws
Exception {
//8. 向pipeline中添加自定义业务处理handler
//添加编解码器
ch.pipeline().addLast(new HttpServerCodec());
// 自定义业务处理类
ch.pipeline().addLast(new
NettyHttpServerHandler());
}
});
//9. 启动服务端并绑定端口,同时将异步改为同步
ChannelFuture future = serverBootstrap.bind(port);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws
Exception {
if (future.isSuccess()) {
System.out.println("端口绑定成功!");
} else {
System.out.println("端口绑定失败!");
}
}
});
System.out.println("http服务端启动成功.");
//10. 关闭通道(并不是真正意义上关闭,而是监听通道关闭的状态)和关闭连接池
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new NettyHttpServer(8080).run();
}
}

NettyHttpServerHandle

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
/**
* http服务器处理类
*/
public class NettyHttpServerHandler extends
SimpleChannelInboundHandler<HttpObject> {
/**
* 读取就绪事件
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg)
throws Exception {
//1.判断请求是不是HTTP请求
if (msg instanceof HttpRequest) {
DefaultHttpRequest request = (DefaultHttpRequest) msg;
System.out.println("浏览器请求路径:" + request.uri());
if ("/favicon.ico".equals(request.uri())) {
System.out.println("图标不响应");
return;
}
//2.给浏览器进行响应
ByteBuf byteBuf = Unpooled.copiedBuffer("Hello! 我是Netty服务器 ",
CharsetUtil.UTF_8);
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK, byteBuf);
//2.1 设置响应头
response.headers().set(HttpHeaderNames.CONTENT_TYPE,
"text/html;charset=utf-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH,
byteBuf.readableBytes());
ctx.writeAndFlush(response);
}
}
}

4.4 基于Netty的WebSocket开发网页版聊天室

4.4.1 WebSocket简介

WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,客户端和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
应用场景十分广泛:

  1. 社交订阅
  2. 协同编辑/编程
  3. 股票基金报价
  4. 体育实况更新
  5. 多媒体聊天
  6. 在线教育

4.4.2 WebSocket和HTTP的区别

http协议是用在应用层的协议,他是基于tcp协议的,http协议建立连接也必须要有三次握手才能发送信息。 http连接分为短连接,长连接,短连接是每次请求都要三次握手才能发送自己的信息。即每一个request对应一个response。长连接是在一定的期限内保持连接。保持TCP连接不断开。客户端与服务器通信,必须要有客户端先发起, 然后服务器返回结果。客户端是主动的,服务器是被动的。 客户端要想实时获取服务端消息就得不断发送长连接到服务端.
WebSocket实现了多路复用,他是全双工通信。在webSocket协议下服务端和客户端可以同时发送信息。 建立了WebSocket连接之后, 服务端可以主动发送信息到客户端。而且信息当中不必在带有head的部分信息了与http的长链接通信来说,这种方式,不仅能降低服务器的压力。而且信息当中也减少了部分多余的信息。

4.4.3 导入基础环境

  1. 将资料中Netty-Springboot工程导入到idea
  2. 相关依赖
<!--整合web模块-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--整合模板引擎 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
  1. 静态资源
  2. yam配置
server:
port: 8080
resources:
static-locations:
- classpath:/static/
spring:
thymeleaf:
cache: false
checktemplatelocation: true
enabled: true
encoding: UTF-8
mode: HTML5
prefix: classpath:/templates/
suffix: .html

4.4.4 服务端开发

  1. 添加Netty依赖
<!--引入netty依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
  1. Netty相关配置
netty:
port: 8081
path: /chat
  1. Netty配置类
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties(prefix = "netty")
@Data
public class NettyConfig {
private int port;//netty监听的端口
private String path;//websocket访问路径
}
  1. NettyWebSocketServer开发
import com.lagou.config.NettyConfig;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
/**
* Netty服务器
*/
@Component
public class NettyWebSocketServer implements Runnable {
@Autowired
NettyConfig nettyConfig;
@Autowired
WebSocketChannelInit webSocketChannelInit;
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private EventLoopGroup workerGroup = new NioEventLoopGroup();
/**
* 资源关闭--在容器销毁是关闭
*/
@PreDestroy
public void close() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
@Override
public void run() {
try {
//1.创建服务端启动助手
ServerBootstrap serverBootstrap = new ServerBootstrap();
//2.设置线程组
serverBootstrap.group(bossGroup, workerGroup);
//3.设置参数
serverBootstrap.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(webSocketChannelInit);
//4.启动
ChannelFuture channelFuture =
serverBootstrap.bind(nettyConfig.getPort()).sync();
System.out.println("--Netty服务端启动成功---");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
  1. 通道初始化对象
import com.lagou.config.NettyConfig;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import
io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 通道初始化对象
*/
@Component
public class WebSocketChannelInit extends ChannelInitializer {
@Autowired
NettyConfig nettyConfig;
@Autowired
WebSocketHandler webSocketHandler;
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//对http协议的支持.
pipeline.addLast(new HttpServerCodec());
// 对大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
//post请求分三部分. request line / request header / message body
// HttpObjectAggregator将多个信息转化成单一的request或者response对象
pipeline.addLast(new HttpObjectAggregator(8000));
// 将http协议升级为ws协议. websocket的支持
pipeline.addLast(new
WebSocketServerProtocolHandler(nettyConfig.getPath()));
// 自定义处理handler
pipeline.addLast(webSocketHandler);
}
}
  1. 处理对象
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
* 自定义处理类
* TextWebSocketFrame: websocket数据是帧的形式处理
*/
@Component
@ChannelHandler.Sharable //设置通道共享
public class WebSocketHandler extends
SimpleChannelInboundHandler<TextWebSocketFrame> {
public static List<Channel> channelList = new ArrayList<>();
/**
* 通道就绪事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//当有新的客户端连接的时候, 将通道放入集合
channelList.add(channel);
System.out.println("有新的连接.");
}
/**
* 通道未就绪--channel下线
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
{
Channel channel = ctx.channel();
//当有客户端断开连接的时候,就移除对应的通道
channelList.remove(channel);
}
/**
* 读就绪事件
*
* @param ctx
* @param textWebSocketFrame
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx,
TextWebSocketFrame textWebSocketFrame) throws Exception {
String msg = textWebSocketFrame.text();
System.out.println("msg:" + msg);
//当前发送消息的通道, 当前发送的客户端连接
Channel channel = ctx.channel();
for (Channel channel1 : channelList) {
//排除自身通道
if (channel != channel1) {
channel1.writeAndFlush(new TextWebSocketFrame(msg));
}
}
}
/**
* 异常处理事件
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
Channel channel = ctx.channel();
//移除集合
channelList.remove(channel);
}
}
  1. 启动类
import com.lagou.netty.NettyWebSocketServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class NettySpringbootApplication implements CommandLineRunner {
@Autowired
NettyWebSocketServer nettyWebSocketServer;
public static void main(String[] args) {
SpringApplication.run(NettySpringbootApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
new Thread(nettyWebSocketServer).start();
}
}
  1. 前端js开发
$(function () {
//这里需要注意的是,prompt有两个参数,前面是提示的话,后面是当对话框出来后,在对话框里
的默认值
var username = "";
while (true) {
//弹出一个输入框,输入一段文字,可以提交
username = prompt("请输入您的名字", ""); //将输入的内容赋给变量 name ,
if (username.trim() === "")//如果返回的有内容
{
alert("名称不能输入空")
} else {
$("#username").text(username);
break;
}
}
var ws = new WebSocket("ws://localhost:8081/chat");
ws.onopen = function () {
console.log("连接成功.")
}
ws.onmessage = function (evt) {
showMessage(evt.data);
}
ws.onclose = function (){
console.log("连接关闭")
}
ws.onerror = function (){
console.log("连接异常")
}
function showMessage(message) {
// 张三:你好
var str = message.split(":");
$("#msg_list").append(`<li class="active"}>
<div class="main">
<img class="avatar" width="30"
height="30" src="/img/user.png">
<div>
<div class="user_name">${str[0]}
</div>
<div class="text">${str[1]}</div>
</div>
</div>
</li>`);
// 置底
setBottom();
}
$('#my_test').bind({
focus: function (event) {
event.stopPropagation()
$('#my_test').val('');
$('.arrow_box').hide()
},
keydown: function (event) {
event.stopPropagation()
if (event.keyCode === 13) {
if ($('#my_test').val().trim() === '') {
this.blur()
$('.arrow_box').show()
setTimeout(() => {
this.focus()
}, 1000)
} else {
$('.arrow_box').hide()
//发送消息
sendMsg();
this.blur()
setTimeout(() => {
this.focus()
})
}
}
}
});
$('#send').on('click', function (event) {
event.stopPropagation()
if ($('#my_test').val().trim() === '') {
$('.arrow_box').show()
} else {
sendMsg();
}
})
function sendMsg() {
var message = $("#my_test").val();
$("#msg_list").append(`<li class="active"}>
<div class="main self">
<div class="text">` + message +
`</div>
</div>
</li>`);
$("#my_test").val('');
//发送消息
message = username + ":" + message;
ws.send(message);
// 置底
setBottom();
}
// 置底
function setBottom() {
// 发送消息后滚动到底部
const container = $('.m-message')
const scroll = $('#msg_list')
container.animate({
scrollTop: scroll[0].scrollHeight - container[0].clientHeight +
container.scrollTop() + 100
});
}
});

4.5 Netty中粘包和拆包的解决方案

4.5.1 粘包和拆包简介

粘包和拆包是TCP网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制。
TCP是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包
问题。
如图所示,假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。

  1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;
  2. 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;
  3. 如果D2的数据包比较大, 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包
  4. 如果D1, D2的数据包都很大, 服务端分多次才能将D1和D2包接收完全,期间发生多次拆包
    TCP粘包和拆包产生的原因:
    数据从发送方到接收方需要经过操作系统的缓冲区,而造成粘包和拆包的主要原因就在这个缓冲区上。粘包可以理解为缓冲区数据堆积,导致多个请求数据粘在一起,而拆包可以理解为发送的数据大于缓冲区,进行拆分处理。

4.5.2 粘包和拆包代码演示

  1. 粘包
    客户端
/**
* 通道就绪事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀.我是Netty客户端" + i,
CharsetUtil.UTF_8));
}
}

服务端

public int count = 0;
/**
* 通道读取事件
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("客户端发送过来的消息:" +
byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("读取次数:"+(++count));
}

运行结果:

服务端一次读取了客户端发送过来的消息,应该读取10次. 因此发生粘包.
2. 拆包
客户端

public void channelActive(ChannelHandlerContext ctx) throws Exception {
//一次发送102400字节数据
byte[] bytes = new byte[102400];
Arrays.fill(bytes, (byte) 10);
for (int i = 0; i < 10; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer(bytes));
}
}

服务端

public int count = 0;
/**
* 通道读取事件
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("长度是:" + byteBuf.readableBytes());
System.out.println("读取次数 = " + (++count));
}

运行结果:

当客户端发送的数据包比较大的时候, 读取了18次, 应该读取10次, 则发送拆包事件.
4.5.3 粘包和拆包的解决方法

  1. 业内解决方案
    由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。
  • 消息长度固定,累计读取到长度和为定长LEN的报文后,就认为读取到了一个完整的信息
  • 将换行符作为消息结束符
  • 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
  • 通过在消息头中定义长度字段来标识消息的总长度
  1. Netty中的粘包和拆包解决方案
    Netty提供了4种解码器来解决,分别如下:
  • 固定长度的拆包器 FixedLengthFrameDecoder,每个应用层数据包的都拆分成都是固定长度的大小
  • 行拆包器 LineBasedFrameDecoder,每个应用层数据包,都以换行符作为分隔符,进行分割拆分
  • 分隔符拆包器 DelimiterBasedFrameDecoder,每个应用层数据包,都通过自定义的分隔符,进行分割拆分
  • 基于数据包长度的拆包器 LengthFieldBasedFrameDecoder,将应用层数据包的长度,作为接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要求,就是应用层协议中包含数据包的长度
  1. 代码实现
    LineBasedFrameDecoder解码器
ch.pipeline().addLast(new LineBasedFrameDecoder(2048));
ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端"+i+"\n",
CharsetUtil.UTF_8));

DelimiterBasedFrameDecoder解码器

ByteBuf byteBuf =
Unpooled.copiedBuffer("$".getBytes(StandardCharsets.UTF_8));
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(2048, byteBuf));ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端"+i+"$",
CharsetUtil.UTF_8));

5.Netty核心源码剖析

暂不写

6. 自定义RPC框架

6.1 分布式架构网络通信

在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现远程通讯的技术,例如:RMI、Hessian、SOAP、ESB和JMS等,它们背后到底是基于什么原理实现的呢

6.1.1 基本原理

要实现网络机器间的通讯,首先得来看看计算机系统网络通信的基本原理,在底层层面去看,网络通信需要做的就是将流从一台计算机传输到另外一台计算机,基于传输协议和网络IO来实现,其中传输协议比较出名的有tcp、udp等等,tcp、udp都是在基于Socket概念上为某类应用场景而扩展出的传输协议,网络IO,主要有bio、nio、aio三种方式,所有的分布式应用通讯都基于这个原理而实现.

6.1.2 什么是RPC

RPC全称为remote procedure call,即远程过程调用。借助RPC可以做到像本地调用一样调用远程服务,是一种进程间的通信方式.
比如两台服务器A和B,A服务器上部署一个应用,B服务器上部署一个应用,A服务器上的应用想调用B服务器上的应用提供的方法,由于两个应用不在一个内存空间,不能直接调用,所以需要通过网络来表达调用的语义和传达调用的数据。需要注意的是RPC并不是一个具体的技术,而是指整个网络远程调用过程。

RPC架构
一个完整的RPC架构里面包含了四个核心的组件,分别是Client,Client Stub,Server以及Server Stub,这个Stub可以理解为存根。

  • 客户端(Client),服务的调用方。
  • 客户端存根(Client Stub),存放服务端的地址消息,再将客户端的请求参数打包成网络消息,然后通过网络远程发送给服务方。
  • 服务端(Server),真正的服务提供者。
  • 服务端存根(Server Stub),接收客户端发送过来的消息,将消息解包,并调用本地的方法。
  1. 客户端(client)以本地调用方式(即以接口的方式)调用服务;
  2. 客户端存根(client stub)接收到调用后,负责将方法、参数等组装成能够进行网络传输的消息体(将消息体对象序列化为二进制);
  3. 客户端通过socket将消息发送到服务端;
  4. 服务端存根( server stub)收到消息后进行解码(将消息对象反序列化);
  5. 服务端存根( server stub)根据解码结果调用本地的服务;
  6. 服务处理
  7. 本地服务执行并将结果返回给服务端存根( server stub);
  8. 服务端存根( server stub)将返回结果打包成消息(将结果消息对象序列化);
  9. 服务端(server)通过socket将消息发送到客户端;
  10. 客户端存根(client stub)接收到结果消息,并进行解码(将结果消息发序列化);
  11. 客户端(client)得到最终结果。
    RPC的目标是要把2、3、4、5、7、8、9、10这些步骤都封装起来。只剩下1、6、11
    注意:无论是何种类型的数据,最终都需要转换成二进制流在网络上进行传输,数据的发送方需要将对象转换为二进制流,而数据的接收方则需要把二进制流再恢复为对象。
    在java中RPC框架比较多,常见的有Hessian、gRPC、Dubbo 等,其实对 于RPC框架而言,核心模块就是通讯和序列化

5.1.3 RMI

Java RMI,即远程方法调用(Remote Method Invocation),一种用于实现远程过程调用(RPCRemote procedure call)的Java API, 能直接传输序列化后的Java对象。它的实现依赖于Java虚拟机,因此它仅支持从一个JVM到另一个JVM的调用。

1.客户端从远程服务器的注册表中查询并获取远程对象引用。
2.桩对象与远程对象具有相同的接口和方法列表,当客户端调用远程对象时,实际上是由相应的桩对象代理完成的。
3.远程引用层在将桩的本地引用转换为服务器上对象的远程引用后,再将调用传递给传输层(Transport),由传输层通过TCP协议发送调用;
4.在服务器端,传输层监听入站连接,它一旦接收到客户端远程调用后,就将这个引用转发给其上层的远程引用层; 5)服务器端的远程引用层将客户端发送的远程应用转换为本地虚拟机的引用后,再将请求传递给骨架(Skeleton); 6)骨架读取参数,又将请求传递给服务器,最后由服务器进行实际的方法调用。
5.如果远程方法调用后有返回值,则服务器将这些结果又沿着“骨架->远程引用层->传输层”向下传递;
6.客户端的传输层接收到返回值后,又沿着“传输层->远程引用层->桩”向上传递,然后由桩来反序列化这些返回值,并将最终的结果传递给客户端程序。
需求分析:

  1. 服务端提供根据ID查询用户的方法
  2. 客户端调用服务端方法, 并返回用户对象
  3. 要求使用RMI进行远程通信
    代码实现:
  4. 服务端
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
/**
* 服务端
*/
public class RMIServer {
public static void main(String[] args) {
try {
//1.注册Registry实例. 绑定端口
Registry registry = LocateRegistry.createRegistry(9998);
//2.创建远程对象
IUserService userService = new UserServiceImpl();
//3.将远程对象注册到RMI服务器上即(服务端注册表上)
registry.rebind("userService", userService);
System.out.println("---RMI服务端启动成功----");
} catch (RemoteException e) {
e.printStackTrace();
}
}
}
  1. 客户端
import com.lagou.rmi.pojo.User;
import com.lagou.rmi.service.IUserService;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
/**
* 客户端
*/
public class RMIClient {
public static void main(String[] args) throws RemoteException,
NotBoundException {
//1.获取Registry实例
Registry registry = LocateRegistry.getRegistry("127.0.0.1", 9998);
//2.通过Registry实例查找远程对象
IUserService userService = (IUserService)
registry.lookup("userService");
User user = userService.getByUserId(2);
System.out.println(user.getId() + "----" + user.getName());
}
}
  1. 接口与实现类
import java.rmi.Remote;
import java.rmi.RemoteException;
public interface IUserService extends Remote {
User getById(int id) throws RemoteException;
}
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
import java.util.HashMap;
import java.util.Map;
public class UserServiceImpl extends UnicastRemoteObject implements
IUserService {
Map<Object, User> userMap = new HashMap();
protected UserServiceImpl() throws RemoteException {
super();
User user1 = new User();
user1.setId(1);
user1.setName("张三");
User user2 = new User();
user2.setId(2);
user2.setName("李四");
userMap.put(user1.getId(), user1);
userMap.put(user2.getId(), user2);
}
@Override
public User getById(int id) throws RemoteException {
return userMap.get(id);
}
}

5.2 基于Netty实现RPC框架

5.2.1 需求介绍

dubbo 底层使用了 Netty 作为网络通讯框架,要求用 Netty 实现一个简单的 RPC 框架,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,

  1. 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定,
  2. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据
  3. 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 进行数据通信
  4. 提供者与消费者数据传输使用json字符串数据格式
  5. 提供者使用netty集成spring boot 环境实现
    案例: 客户端远程调用服务端提供根据ID查询user对象的方法.

    5.2.2 代码实现
  6. 服务端代码
  • 注解RpcService
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 对外暴露服务接口
*/
@Target(ElementType.TYPE) // 用于接口和类上
@Retention(RetentionPolicy.RUNTIME)// 在运行时可以获取到
public @interface RpcService {
}
  • 实现类UserServiceImpl
import com.lagou.rpc.api.IUserService;
import com.lagou.rpc.pojo.User;
import com.lagou.rpc.provider.anno.RpcService;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@RpcService
@Service
public class UserServiceImpl implements IUserService {
Map<Object, User> userMap = new HashMap();
@Override
public User getById(int id) {
if (userMap.size() == 0) {
User user1 = new User();
user1.setId(1);
user1.setName("张三");
User user2 = new User();
user2.setId(2);
user2.setName("李四");
userMap.put(user1.getId(), user1);
userMap.put(user2.getId(), user2);
}
return userMap.get(id);
}
}
  • 服务Netty启动类RpcServer
import com.lagou.rpc.provider.handler.RpcServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 启动类
*/
@Service
public class RpcServer implements DisposableBean {
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workerGroup;
@Autowired
RpcServerHandler rpcServerHandler;
public void startServer(String ip, int port) {
try {
//1. 创建线程组
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
//2. 创建服务端启动助手
ServerBootstrap serverBootstrap = new ServerBootstrap();
//3. 设置参数
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>
() {
@Override
protected void initChannel(SocketChannel
channel) throws Exception {
ChannelPipeline pipeline =
channel.pipeline();
//添加String的编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
//业务处理类
pipeline.addLast(rpcServerHandler);
}
});
//4.绑定端口
ChannelFuture sync = serverBootstrap.bind(ip, port).sync();
System.out.println("==========服务端启动成功==========");
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}
@Override
public void destroy() throws Exception {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}

服务业务处理类RpcServerHandler

import com.alibaba.fastjson.JSON;
import com.lagou.rpc.common.RpcRequest;
import com.lagou.rpc.common.RpcResponse;
import com.lagou.rpc.provider.anno.RpcService;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.beans.BeansException;
import org.springframework.cglib.reflect.FastClass;
import org.springframework.cglib.reflect.FastMethod;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* 服务端业务处理类
* 1.将标有@RpcService注解的bean缓存
* 2.接收客户端请求
* 3.根据传递过来的beanName从缓存中查找到对应的bean
* 4.解析请求中的方法名称. 参数类型 参数信息
* 5.反射调用bean的方法
* 6.给客户端进行响应
*/
@Component
@ChannelHandler.Sharable
public class RpcServerHandler extends
SimpleChannelInboundHandler<String> implements ApplicationContextAware {
private static final Map SERVICE_INSTANCE_MAP = new
ConcurrentHashMap();
/**
* 1.将标有@RpcService注解的bean缓存
*
* @param applicationContext
* @throws BeansException
* */
@Override
public void setApplicationContext(ApplicationContext
applicationContext) throws BeansException {
Map<String, Object> serviceMap =
applicationContext.getBeansWithAnnotation(RpcService.class);
if (serviceMap != null && serviceMap.size() > 0) {
Set<Map.Entry<String, Object>> entries =
serviceMap.entrySet();
for (Map.Entry<String, Object> item : entries) {
Object serviceBean = item.getValue();
if (serviceBean.getClass().getInterfaces().length == 0)
{
throw new RuntimeException("服务必须实现接口");
}
//默认取第一个接口作为缓存bean的名称
String name = serviceBean.getClass().getInterfaces()
[0].getName();
SERVICE_INSTANCE_MAP.put(name, serviceBean);
}
}
}
/**
* 通道读取就绪事件
*
* @param channelHandlerContext
* @param msg
* * @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext
channelHandlerContext, String msg) throws Exception {
//1.接收客户端请求- 将msg转化RpcRequest对象
RpcRequest rpcRequest = JSON.parseObject(msg, RpcRequest.class);
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setRequestId(rpcRequest.getRequestId());
try {
//业务处理
rpcResponse.setResult(handler(rpcRequest));
} catch (Exception exception) {
exception.printStackTrace();
rpcResponse.setError(exception.getMessage());
}
//6.给客户端进行响应
channelHandlerContext.writeAndFlush(JSON.toJSONString(rpcResponse));
}
/**
* 业务处理逻辑
*
* @return
*/
public Object handler(RpcRequest rpcRequest) throws
InvocationTargetException {
// 3.根据传递过来的beanName从缓存中查找到对应的bean
Object serviceBean =
SERVICE_INSTANCE_MAP.get(rpcRequest.getClassName());
if (serviceBean == null) {
throw new RuntimeException("根据beanName找不到服务,beanName:"
+ rpcRequest.getClassName());
}
//4.解析请求中的方法名称. 参数类型 参数信息
Class<?> serviceBeanClass = serviceBean.getClass();
String methodName = rpcRequest.getMethodName();
Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
Object[] parameters = rpcRequest.getParameters();
//5.反射调用bean的方法- CGLIB反射调用
FastClass fastClass = FastClass.create(serviceBeanClass);
FastMethod method = fastClass.getMethod(methodName,
parameterTypes);
return method.invoke(serviceBean, parameters);
}
}

启动类ServerBootstrap

import com.lagou.rpc.provider.server.RpcServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ServerBootstrapApplication implements CommandLineRunner {
@Autowired
RpcServer rpcServer;
public static void main(String[] args) {
SpringApplication.run(ServerBootstrapApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
rpcServer.startServer("127.0.0.1", 8899);
}
}).start();
}
}
  1. 客户端代码实现
  • 客户端Netty启动类
import com.lagou.rpc.consumer.handler.RpcClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* 客户端
* 1.连接Netty服务端
* 2.提供给调用者主动关闭资源的方法
* 3.提供消息发送的方法
*/
public class RpcClient {
private EventLoopGroup group;
private Channel channel;
private String ip;
private int port;
private RpcClientHandler rpcClientHandler = new RpcClientHandler();
private ExecutorService executorService =
Executors.newCachedThreadPool();
public RpcClient(String ip, int port) {
this.ip = ip;
this.port = port;
initClient();
}
/**
* 初始化方法-连接Netty服务端
*/
public void initClient() {
try {
//1.创建线程组
group = new NioEventLoopGroup();
//2.创建启动助手
Bootstrap bootstrap = new Bootstrap();
//3.设置参数
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel
channel) throws Exception {
ChannelPipeline pipeline =
channel.pipeline();
//String类型编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
//添加客户端处理类
pipeline.addLast(rpcClientHandler);
}
});
//4.连接Netty服务端
channel = bootstrap.connect(ip, port).sync().channel();
} catch (Exception exception) {
exception.printStackTrace();
if (channel != null) {
channel.close();
}
if (group != null) {
group.shutdownGracefully();
}
}
}
/**
* 提供给调用者主动关闭资源的方法
*/
public void close() {
if (channel != null) {
channel.close();
}
if (group != null) {
group.shutdownGracefully();
}
}
/**
* 提供消息发送的方法
*/
public Object send(String msg) throws ExecutionException,
InterruptedException {
rpcClientHandler.setRequestMsg(msg);
Future submit = executorService.submit(rpcClientHandler);
return submit.get();
}
}

客户端业务处理类RpcClientHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.concurrent.Callable;
/**
* 客户端处理类
* 1.发送消息
* 2.接收消息
*/
public class RpcClientHandler extends
SimpleChannelInboundHandler<String> implements Callable {
ChannelHandlerContext context;
//发送的消息
String requestMsg;
//服务端的消息
String responseMsg;
public void setRequestMsg(String requestMsg) {
this.requestMsg = requestMsg;
}
/**
* 通道连接就绪事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws
Exception {
context = ctx;
}
/**
* 通道读取就绪事件
*
* @param channelHandlerContext
* @param msg
* @throws Exception
*/
@Override
protected synchronized void channelRead0(ChannelHandlerContext
channelHandlerContext, String msg) throws Exception {
responseMsg = msg;
//唤醒等待的线程
notify();
}
/**
* 发送消息到服务端
*
* @return
* @throws Exception
*/
@Override
public synchronized Object call() throws Exception {
//消息发送
context.writeAndFlush(requestMsg);
//线程等待
wait();
return responseMsg;
}
}

RPC代理类

import com.alibaba.fastjson.JSON;
import com.lagou.rpc.common.RpcRequest;
import com.lagou.rpc.common.RpcResponse;
import com.lagou.rpc.consumer.client.RpcClient;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.UUID;
/**
* 客户端代理类-创建代理对象
* 1.封装request请求对象
* 2.创建RpcClient对象
* 3.发送消息
* * 4.返回结果
*/
public class RpcClientProxy {
public static Object createProxy(Class serviceClass) {
return
Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[]{serviceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method,
Object[] args) throws Throwable {
//1.封装request请求对象
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setRequestId(UUID.randomUUID().toString());
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterTypes(method.getParameterTypes());
rpcRequest.setParameters(args);
//2.创建RpcClient对象
RpcClient rpcClient = new RpcClient("127.0.0.1",
8899);
try {
//3.发送消息
Object responseMsg =
rpcClient.send(JSON.toJSONString(rpcRequest));
RpcResponse rpcResponse =
JSON.parseObject(responseMsg.toString(), RpcResponse.class);
if (rpcResponse.getError() != null) {
throw new
RuntimeException(rpcResponse.getError());
}
//4.返回结果
Object result = rpcResponse.getResult();
return JSON.parseObject(result.toString(),
method.getReturnType());
} catch (Exception e) {
throw e;
} finally {
rpcClient.close();
}
}
});
}
}

客户端启动类ClientBootStrap

import com.lagou.rpc.api.IUserService;
import com.lagou.rpc.consumer.proxy.RpcClientProxy;
import com.lagou.rpc.pojo.User;
/**
* 测试类
*/
public class ClientBootStrap {
public static void main(String[] args) {
IUserService userService = (IUserService)
RpcClientProxy.createProxy(IUserService.class);
User user = userService.getById(1);
System.out.println(user);
}
}

分布式理论、架构设计(自定义RPC一 NIO NETTY)相关推荐

  1. 分布式 | Dubbo 架构设计详解

    转载自   分布式 | Dubbo 架构设计详解 Dubbo是Alibaba开源的分布式服务框架,它最大的特点是按照分层的方式来架构,使用这种方式可以使各个层之间解耦合(或者最大限度地松耦合).从服务 ...

  2. 分布式事务架构设计原理

    随着业务需求的复杂化,企业应用规模不断扩大,在后端开发中经常会遇到以下问题: 业务的并发要求非常高,对应的业务需要通过微服务拆分,甚至分库分表等架构设计才能满足并发需求,此时业务操作无法在同一个数据库 ...

  3. 软考高级系统架构设计师:论分布式存储系统架构设计

    软考高级系统架构设计师:论分布式存储系统架构设计 一.集群存储技术 二.分布式文件系统 三.网络存储系统 四.P2P网络存储技术 五.提高分布式存储系统可靠性方法 简要说明在分布式存储系统架构设计中所 ...

  4. 金融级分布式数据库架构设计要点

    行业背景 银行业从最初的手工记账到会计电算化,到金融电子化,再到现在的金融科技,可以看到金融与科技的结合越来越紧密,人工智能.大数据.物联网.区块链等新兴技术改变了金融的交易方式,为金融行业的创新前行 ...

  5. 大型分布式网站架构设计与实践

    阅读文本大概需要3分钟. SOA和RPC 随着互联网规模发展,面向服务的体系架构(SOA)成为主流的架构方式,SOA的本质思想是高内聚.低耦合地实现分治,各个系统之间通过服务的方式进行交互,这样保证了 ...

  6. java路由架构_《大型分布式网站架构设计与实践》读书笔记之 服务的路由和负载均衡...

    服务的路由和负载均衡 公共的业务被拆分出来,形成可共用的服务,最大程度的保证了代码和逻辑的复用,避免重复建设,这种设计也被成为SOA(Service-Oriented Architecture) SO ...

  7. Spring Boot+Docker微服务分布式服务架构设计和部署案例

    2019独角兽企业重金招聘Python工程师标准>>> j360-microservice spring-boot+docker微服务设计之j360-microservice:(欢迎 ...

  8. [读书笔记]大型分布式网站架构设计与实践.分布式缓存

    前言:本书是对分布式系统架构涉及到的相关技术的一本科普书籍.由于很难作为开发参考,只能但求了解.所以通篇浅读,对分布式系统进行大致的了解.因为写的非常好,感觉非常有意思,自己也做不出总结.所谓的读书笔 ...

  9. 支付宝分布式事务架构设计草案

    为什么80%的码农都做不了架构师?>>>    1 背景介绍 为了应对快速变化的市场需求.持续增长的业务量,支付宝系统需要基于SOA进行构建与改造,以应对系统规模和复杂性的挑战,更好 ...

最新文章

  1. 从零开始学习springboot之springboot搭建
  2. pyqt5实战之真爱游戏(2048改版)
  3. 怎么改善现有网站为xhtml+CSS
  4. python协程学习——写个并发获取网站标题的工具
  5. 从Swap函数谈加法溢出问题
  6. STM32_DMA 标准初始化设置解释
  7. windows系统下的python环境的搭建
  8. 买游戏来运营_「笔吧评测室」双十一快来了,买游戏本要做好心理准备
  9. linux服务器历险之linux性能监控
  10. Web 应用服务器端渲染入门指南
  11. 15个只有数学老师懂的泪流满面瞬间
  12. zabbix之web监控
  13. 2015计算机二级公共基础知识,2015年计算机二级公共基础知识考点测试题(8)
  14. 诗与远方:无题(八)
  15. 邀请人数排行榜代码 php,成功邀请好友人数排行榜.PDF
  16. hnu 暑期实训之到底买不买
  17. 用树莓派控制WS2812圣诞树灯饰
  18. cocos2d-x学习之旅(九): 2.2 盘古开天辟地,进入游戏世界
  19. 《软件构架实践》10-12章读后感
  20. 【STM32H7的DSP教程】第5章 Matlab简易使用之常用编程语句

热门文章

  1. java虚拟机第三版学习
  2. 【转载】Photoshop快捷键和技巧大全
  3. 湛江财贸学校计算机二级,湛江财贸学校
  4. 通过CRC32爆破修改图片的宽高 ctf-misc图片隐写
  5. C语言初学者应该知道些什么
  6. SQL Server 安装文件挂起错误解决办法
  7. Gson替代方案Moshi使用教程
  8. 最短路径:迪杰斯特拉(Dijkstra)算法图解
  9. 恢复笔记本电脑电池容量的技巧
  10. 7-24 藏尾诗 (20分)