今天我们要来做一道小菜,这道菜就是RPC通讯框架。它使用netty作为原料,fastjson序列化工具作为调料,来实现一个极简的多线程RPC服务框架。
我们暂且命名该RPC框架为rpckids。

食用指南
在告诉读者完整的制作菜谱之前,我们先来试试这个小菜怎么个吃法,好不好吃,是不是吃起来很方便。如果读者觉得很难吃,那后面的菜谱就没有多大意义了,何必花心思去学习制作一门谁也不爱吃的大烂菜呢?
例子中我会使用rpckids提供的远程RPC服务,用于计算斐波那契数和指数,客户端通过rpckids提供的RPC客户端向远程服务传送参数,并接受返回结果,然后呈现出来。你可以使用rpckids定制任意的业务rpc服务。

斐波那契数输入输出比较简单,一个Integer,一个Long。
指数输入有两个值,输出除了计算结果外还包含计算耗时,以纳秒计算。之所以包含耗时,只是为了呈现一个完整的自定义的输入和输出类。
指数服务自定义输入输出类
// 指数RPC的输入
public class ExpRequest {
private int base;
private int exp;

// constructor & getter & setter

}

// 指数RPC的输出
public class ExpResponse {

private long value;
private long costInNanos;// constructor & getter & setter

}
复制代码斐波那契和指数计算处理
public class FibRequestHandler implements IMessageHandler {

private List<Long> fibs = new ArrayList<>();{fibs.add(1L); // fib(0) = 1fibs.add(1L); // fib(1) = 1
}@Override
public void handle(ChannelHandlerContext ctx, String requestId, Integer n) {for (int i = fibs.size(); i < n + 1; i++) {long value = fibs.get(i - 2) + fibs.get(i - 1);fibs.add(value);}// 输出响应ctx.writeAndFlush(new MessageOutput(requestId, "fib_res", fibs.get(n)));
}

}

public class ExpRequestHandler implements IMessageHandler {

@Override
public void handle(ChannelHandlerContext ctx, String requestId, ExpRequest message) {int base = message.getBase();int exp = message.getExp();long start = System.nanoTime();long res = 1;for (int i = 0; i < exp; i++) {res *= base;}long cost = System.nanoTime() - start;// 输出响应ctx.writeAndFlush(new MessageOutput(requestId, "exp_res", new ExpResponse(res, cost)));
}

}
复制代码构建RPC服务器
RPC服务类要监听指定IP端口,设定io线程数和业务计算线程数,然后注册斐波那契服务输入类和指数服务输入类,还有相应的计算处理器。
public class DemoServer {

public static void main(String[] args) {RPCServer server = new RPCServer("localhost", 8888, 2, 16);server.service("fib", Integer.class, new FibRequestHandler()).service("exp", ExpRequest.class, new ExpRequestHandler());server.start();
}

}
复制代码构建RPC客户端
RPC客户端要链接远程IP端口,并注册服务输出类(RPC响应类),然后分别调用20次斐波那契服务和指数服务,输出结果
public class DemoClient {

private RPCClient client;public DemoClient(RPCClient client) {this.client = client;// 注册服务返回类型this.client.rpc("fib_res", Long.class).rpc("exp_res", ExpResponse.class);
}public long fib(int n) {return (Long) client.send("fib", n);
}public ExpResponse exp(int base, int exp) {return (ExpResponse) client.send("exp", new ExpRequest(base, exp));
}public static void main(String[] args) {RPCClient client = new RPCClient("localhost", 8888);DemoClient demo = new DemoClient(client);for (int i = 0; i < 20; i++) {System.out.printf("fib(%d) = %d\n", i, demo.fib(i));}for (int i = 0; i < 20; i++) {ExpResponse res = demo.exp(2, i);System.out.printf("exp2(%d) = %d cost=%dns\n", i, res.getValue(), res.getCostInNanos());}
}

}
复制代码运行
先运行服务器,服务器输出如下,从日志中可以看到客户端链接过来了,然后发送了一系列消息,最后关闭链接走了。
server started @ localhost:8888
connection comes
read a message
read a message

connection leaves
复制代码再运行客户端,可以看到一些列的计算结果都成功完成了输出。
fib(0) = 1
fib(1) = 1
fib(2) = 2
fib(3) = 3
fib(4) = 5

exp2(0) = 1 cost=559ns
exp2(1) = 2 cost=495ns
exp2(2) = 4 cost=524ns
exp2(3) = 8 cost=640ns
exp2(4) = 16 cost=711ns

复制代码牢骚
本以为是小菜一碟,但是编写完整的代码和文章却将近花费了一天的时间,深感写码要比做菜耗时太多了。因为只是为了教学目的,所以在实现细节上还有好多没有仔细去雕琢的地方。如果是要做一个开源项目,力求非常完美的话。至少还要考虑一下几点。

客户端连接池
多服务进程负载均衡
日志输出
参数校验,异常处理
客户端流量攻击
服务器压力极限

如果要参考grpc的话,还得实现流式响应处理。如果还要为了节省网络流量的话,又需要在协议上下功夫。这一大堆的问题还是抛给读者自己思考去吧。
关注公众号「码洞」,发送「RPC」即可获取以上完整菜谱的GitHub开源代码链接。读者有什么不明白的地方,洞主也会一一解答。
下面我们接着讲RPC服务器和客户端精细的制作过程
服务器菜谱
定义消息输入输出格式,消息类型、消息唯一ID和消息的json序列化字符串内容。消息唯一ID是用来客户端验证服务器请求和响应是否匹配。
public class MessageInput {
private String type;
private String requestId;
private String payload;

public MessageInput(String type, String requestId, String payload) {this.type = type;this.requestId = requestId;this.payload = payload;
}public String getType() {return type;
}public String getRequestId() {return requestId;
}// 因为我们想直接拿到对象,所以要提供对象的类型参数
public <T> T getPayload(Class<T> clazz) {if (payload == null) {return null;}return JSON.parseObject(payload, clazz);
}

}

public class MessageOutput {

private String requestId;
private String type;
private Object payload;public MessageOutput(String requestId, String type, Object payload) {this.requestId = requestId;this.type = type;this.payload = payload;
}public String getType() {return this.type;
}public String getRequestId() {return requestId;
}public Object getPayload() {return payload;
}

}
复制代码消息解码器,使用Netty的ReplayingDecoder实现。简单起见,这里没有使用checkpoint去优化性能了,感兴趣的话读者可以参考一下我之前在公众号里发表的相关文章,将checkpoint相关的逻辑自己添加进去。
public class MessageDecoder extends ReplayingDecoder {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {String requestId = readStr(in);String type = readStr(in);String content = readStr(in);out.add(new MessageInput(type, requestId, content));
}private String readStr(ByteBuf in) {// 字符串先长度后字节数组,统一UTF8编码int len = in.readInt();if (len < 0 || len > (1 << 20)) {throw new DecoderException("string too long len=" + len);}byte[] bytes = new byte[len];in.readBytes(bytes);return new String(bytes, Charsets.UTF8);
}

}
复制代码消息处理器接口,每个自定义服务必须实现handle方法
public interface IMessageHandler {

void handle(ChannelHandlerContext ctx, String requestId, T message);

}

// 找不到类型的消息统一使用默认处理器处理
public class DefaultHandler implements IMessageHandler {

@Override
public void handle(ChannelHandlerContext ctx, String requesetId, MessageInput input) {System.out.println("unrecognized message type=" + input.getType() + " comes");
}

}
复制代码消息类型注册中心和消息处理器注册中心,都是用静态字段和方法,其实也是为了图方便,写成非静态的可能会优雅一些。
public class MessageRegistry {
private static Map<String, Class<?>> clazzes = new HashMap<>();

public static void register(String type, Class<?> clazz) {clazzes.put(type, clazz);
}public static Class<?> get(String type) {return clazzes.get(type);
}

}

public class MessageHandlers {

private static Map<String, IMessageHandler<?>> handlers = new HashMap<>();
public static DefaultHandler defaultHandler = new DefaultHandler();public static void register(String type, IMessageHandler<?> handler) {handlers.put(type, handler);
}public static IMessageHandler<?> get(String type) {IMessageHandler<?> handler = handlers.get(type);return handler;
}

}
复制代码响应消息的编码器比较简单
@Sharable
public class MessageEncoder extends MessageToMessageEncoder {

@Override
protected void encode(ChannelHandlerContext ctx, MessageOutput msg, List<Object> out) throws Exception {ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer();writeStr(buf, msg.getRequestId());writeStr(buf, msg.getType());writeStr(buf, JSON.toJSONString(msg.getPayload()));out.add(buf);
}private void writeStr(ByteBuf buf, String s) {buf.writeInt(s.length());buf.writeBytes(s.getBytes(Charsets.UTF8));
}

}
复制代码好,接下来进入关键环节,将上面的小模小块凑在一起,构建一个完整的RPC服务器框架,这里就需要读者有必须的Netty基础知识了,需要编写Netty的事件回调类和服务构建类。
@Sharable
public class MessageCollector extends ChannelInboundHandlerAdapter {
// 业务线程池
private ThreadPoolExecutor executor;

public MessageCollector(int workerThreads) {// 业务队列最大1000,避免堆积// 如果子线程处理不过来,io线程也会加入处理业务逻辑(callerRunsPolicy)BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);// 给业务线程命名ThreadFactory factory = new ThreadFactory() {AtomicInteger seq = new AtomicInteger();@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("rpc-" + seq.getAndIncrement());return t;}};// 闲置时间超过30秒的线程自动销毁this.executor = new ThreadPoolExecutor(1, workerThreads, 30, TimeUnit.SECONDS, queue, factory,new CallerRunsPolicy());
}public void closeGracefully() {// 优雅一点关闭,先通知,再等待,最后强制关闭this.executor.shutdown();try {this.executor.awaitTermination(10, TimeUnit.SECONDS);} catch (InterruptedException e) {}this.executor.shutdownNow();
}@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {// 客户端来了一个新链接System.out.println("connection comes");
}@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {// 客户端走了一个System.out.println("connection leaves");ctx.close();
}@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof MessageInput) {System.out.println("read a message");// 用业务线程池处理消息this.executor.execute(() -> {this.handleMessage(ctx, (MessageInput) msg);});}
}private void handleMessage(ChannelHandlerContext ctx, MessageInput input) {// 业务逻辑在这里Class<?> clazz = MessageRegistry.get(input.getType());if (clazz == null) {// 没注册的消息用默认的处理器处理MessageHandlers.defaultHandler.handle(ctx, input.getRequestId(), input);return;}Object o = input.getPayload(clazz);// 这里是小鲜的瑕疵,代码外观上比较难看,但是大厨表示才艺不够,很无奈// 读者如果感兴趣可以自己想办法解决@SuppressWarnings("unchecked")IMessageHandler<Object> handler = (IMessageHandler<Object>) MessageHandlers.get(input.getType());if (handler != null) {handler.handle(ctx, input.getRequestId(), o);} else {// 用默认的处理器处理吧MessageHandlers.defaultHandler.handle(ctx, input.getRequestId(), input);}
}@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// 此处可能因为客户端机器突发重启// 也可能是客户端链接闲置时间超时,后面的ReadTimeoutHandler抛出来的异常// 也可能是消息协议错误,序列化异常// etc.// 不管它,链接统统关闭,反正客户端具备重连机制System.out.println("connection error");cause.printStackTrace();ctx.close();
}

}

public class RPCServer {

private String ip;
private int port;
private int ioThreads; // 用来处理网络流的读写线程
private int workerThreads; // 用于业务处理的计算线程public RPCServer(String ip, int port, int ioThreads, int workerThreads) {this.ip = ip;this.port = port;this.ioThreads = ioThreads;this.workerThreads = workerThreads;
}private ServerBootstrap bootstrap;
private EventLoopGroup group;
private MessageCollector collector;
private Channel serverChannel;// 注册服务的快捷方式
public RPCServer service(String type, Class<?> reqClass, IMessageHandler<?> handler) {MessageRegistry.register(type, reqClass);MessageHandlers.register(type, handler);return this;
}// 启动RPC服务
public void start() {bootstrap = new ServerBootstrap();group = new NioEventLoopGroup(ioThreads);bootstrap.group(group);collector = new MessageCollector(workerThreads);MessageEncoder encoder = new MessageEncoder();bootstrap.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipe = ch.pipeline();// 如果客户端60秒没有任何请求,就关闭客户端链接pipe.addLast(new ReadTimeoutHandler(60));// 挂上解码器pipe.addLast(new MessageDecoder());// 挂上编码器pipe.addLast(encoder);// 将业务处理器放在最后pipe.addLast(collector);}});bootstrap.option(ChannelOption.SO_BACKLOG, 100)  // 客户端套件字接受队列大小.option(ChannelOption.SO_REUSEADDR, true) // reuse addr,避免端口冲突.option(ChannelOption.TCP_NODELAY, true) // 关闭小流合并,保证消息的及时性.childOption(ChannelOption.SO_KEEPALIVE, true); // 长时间没动静的链接自动关闭serverChannel = bootstrap.bind(this.ip, this.port).channel();System.out.printf("server started @ %s:%d\n", ip, port);
}public void stop() {// 先关闭服务端套件字serverChannel.close();// 再斩断消息来源,停止io线程池group.shutdownGracefully();// 最后停止业务线程collector.closeGracefully();
}

}
复制代码上面就是完整的服务器菜谱,代码较多,读者如果没有Netty基础的话,可能会看得眼花缭乱。如果你不常使用JDK的Executors框架,阅读起来估计也够呛。如果读者需要相关学习资料,可以找我索取。
客户端菜谱
服务器使用NIO实现,客户端也可以使用NIO实现,不过必要性不大,用同步的socket实现也是没有问题的。更重要的是,同步的代码比较简短,便于理解。所以简单起见,这里使用了同步IO。
定义RPC请求对象和响应对象,和服务器一一对应。
public class RPCRequest {

private String requestId;
private String type;
private Object payload;public RPCRequest(String requestId, String type, Object payload) {this.requestId = requestId;this.type = type;this.payload = payload;
}public String getRequestId() {return requestId;
}public String getType() {return type;
}public Object getPayload() {return payload;
}

}

public class RPCResponse {

private String requestId;
private String type;
private Object payload;public RPCResponse(String requestId, String type, Object payload) {this.requestId = requestId;this.type = type;this.payload = payload;
}public String getRequestId() {return requestId;
}public void setRequestId(String requestId) {this.requestId = requestId;
}public String getType() {return type;
}public void setType(String type) {this.type = type;
}public Object getPayload() {return payload;
}public void setPayload(Object payload) {this.payload = payload;
}

}
复制代码定义客户端异常,用于统一抛出RPC错误
public class RPCException extends RuntimeException {

private static final long serialVersionUID = 1L;public RPCException(String message, Throwable cause) {super(message, cause);
}public RPCException(String message) {super(message);
}public RPCException(Throwable cause) {super(cause);
}

}
复制代码请求ID生成器,简单的UUID64
public class RequestId {

public static String next() {return UUID.randomUUID().toString();
}

}
复制代码响应类型注册中心,和服务器对应
public class ResponseRegistry {
private static Map<String, Class<?>> clazzes = new HashMap<>();

public static void register(String type, Class<?> clazz) {clazzes.put(type, clazz);
}public static Class<?> get(String type) {return clazzes.get(type);
}

}
复制代码好,接下来进入客户端的关键环节,链接管理、读写消息、链接重连都在这里
public class RPCClient {

private String ip;
private int port;
private Socket sock;
private DataInputStream input;
private OutputStream output;public RPCClient(String ip, int port) {this.ip = ip;this.port = port;
}public void connect() throws IOException {SocketAddress addr = new InetSocketAddress(ip, port);sock = new Socket();sock.connect(addr, 5000); // 5s超时input = new DataInputStream(sock.getInputStream());output = sock.getOutputStream();
}public void close() {// 关闭链接try {sock.close();sock = null;input = null;output = null;} catch (IOException e) {}
}public Object send(String type, Object payload) {// 普通rpc请求,正常获取响应try {return this.sendInternal(type, payload, false);} catch (IOException e) {throw new RPCException(e);}
}public RPCClient rpc(String type, Class<?> clazz) {// rpc响应类型注册快捷入口ResponseRegistry.register(type, clazz);return this;
}public void cast(String type, Object payload) {// 单向消息,服务器不得返回结果try {this.sendInternal(type, payload, true);} catch (IOException e) {throw new RPCException(e);}
}private Object sendInternal(String type, Object payload, boolean cast) throws IOException {if (output == null) {connect();}String requestId = RequestId.next();ByteArrayOutputStream bytes = new ByteArrayOutputStream();DataOutputStream buf = new DataOutputStream(bytes);writeStr(buf, requestId);writeStr(buf, type);writeStr(buf, JSON.toJSONString(payload));buf.flush();byte[] fullLoad = bytes.toByteArray();try {// 发送请求output.write(fullLoad);} catch (IOException e) {// 网络异常要重连close();connect();output.write(fullLoad);}if (!cast) {// RPC普通请求,要立即获取响应String reqId = readStr();// 校验请求ID是否匹配if (!requestId.equals(reqId)) {close();throw new RPCException("request id mismatch");}String typ = readStr();Class<?> clazz = ResponseRegistry.get(typ);// 响应类型必须提前注册if (clazz == null) {throw new RPCException("unrecognized rpc response type=" + typ);}// 反序列化json串String payld = readStr();Object res = JSON.parseObject(payld, clazz);return res;}return null;
}private String readStr() throws IOException {int len = input.readInt();byte[] bytes = new byte[len];input.readFully(bytes);return new String(bytes, Charsets.UTF8);
}private void writeStr(DataOutputStream out, String s) throws IOException {out.writeInt(s.length());out.write(s.getBytes(Charsets.UTF8));
}

}
复制代码牢骚重提
本以为是小菜一碟,但是编写完整的代码和文章却将近花费了一天的时间,深感写码要比做菜耗时太多了。因为只是为了教学目的,所以在实现细节上还有好多没有仔细去雕琢的地方。如果是要做一个开源项目,力求非常完美的话。至少还要考虑一下几点。

客户端连接池
多服务进程负载均衡
日志输出
参数校验,异常处理
客户端流量攻击
服务器压力极限

如果要参考grpc的话,还得实现流式响应处理。如果还要为了节省网络流量的话,又需要在协议上下功夫。这一大堆的问题还是抛给读者自己思考去吧。
关注公众号「码洞」,发送「RPC」即可获取以上完整菜谱的GitHub开源代码链接。读者有什么不明白的地方,洞主也会一一解答。

作者:老錢
链接:https://juejin.im/post/5ad2a99ff265da238d51264d
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

大厨小鲜——基于Netty自己动手实现RPC框架相关推荐

  1. 大厨小鲜——基于Netty自己动手编写RPC框架

    今天我们要来做一道小菜,这道菜就是RPC通讯框架.它使用netty作为原料,fastjson序列化工具作为调料,来实现一个极简的多线程RPC服务框架. 我们暂且命名该RPC框架为rpckids. 食用 ...

  2. 大厨小鲜——自己动手实现一个极简Web框架

    上节课我们自己动手制作了一个RPC框架,本节课我们挑战一个稍有难度的一点的任务,手动制作一个Web框架. 首先我们看看这个Web框架使用起来如何简单 Hello World import httpki ...

  3. 基于Protobuf的分布式高性能RPC框架——Navi-Pbrpc

    基于Protobuf的分布式高性能RPC框架--Navi-Pbrpc 二月 8, 2016 1 简介 Navi-pbrpc框架是一个高性能的远程调用RPC框架,使用netty4技术提供非阻塞.异步.全 ...

  4. 带你手写基于 Spring 的可插拔式 RPC 框架(二)整体结构

    前言 上一篇文章中我们已经知道了什么是 RPC 框架和为什么要做一个 RPC 框架了,这一章我们来从宏观上分析,怎么来实现一个 RPC 框架,这个框架都有那些模块以及这些模块的作用. 总体设计 在我们 ...

  5. 基于zeromq的高性能分布式RPC框架Zerorpc 性能测试

    Zeromq 是基于zeromq.gevent和 msgpack开发的分布式RPC框架zerorpc-python.这个框架简单.易用. 1. 安装zeromq 1 2 3 4 5 6 yum -y ...

  6. 带你手写基于 Spring 的可插拔式 RPC 框架(四)代理类的注入与服务启动

    上一章节我们已经实现了从客户端往服务端发送数据并且通过反射方法调用服务端的实现类最后返回给客户端的底层协议. 这一章节我们来实现客户端代理类的注入. 承接上一章,我们实现了多个底层协议,procoto ...

  7. 基于DotNet Core的RPC框架(一) DotBPE.RPC快速开始

    0x00 简介 DotBPE.RPC是一款基于dotnet core编写的RPC框架,而它的爸爸DotBPE,目标是实现一个开箱即用的微服务框架,但是它还差点意思,还仅仅在构思和尝试的阶段.但不管怎么 ...

  8. spring整合各种RPC框架(netty、dubbo、dubbox、gRPC、Motan)-续(Motan)

    继上文 :spring整合各种RPC框架(netty.dubbo.dubbox.gRPC.Motan)-续(gRPC) Motan相关介绍? Motan是新浪微博开源的一套基于java开发的RPC框架 ...

  9. 也谈大公司病3——治大国不是烹小鲜

    2019独角兽企业重金招聘Python工程师标准>>> 序 多 数大公司大了后都不可避免会遇到大公司病,机构臃肿,行动缓慢,协调困难,思维僵化.为此,大公司采取了各种各样的做法,建设 ...

最新文章

  1. iOS--OCR图片识别
  2. RNN的优秀变种: LSTM GRU
  3. python避免深度嵌套的if-else_避免嵌套If语句?(Python 2.7)
  4. QMainWindow中的布局管理
  5. 猛男教你写代码_猛男程序员,鼓存储器和1960年代机器代码的取证分析
  6. 深度学习三(PyTorch物体检测实战)
  7. MTK 驱动(85)----RPMB key introduction
  8. Sharding-JDBC水平分库(水平数据库分片测试)_Sharding-Sphere,Sharding-JDBC分布式_分库分表工作笔记011
  9. css 实现兼容各浏览器的渐变效果
  10. 生产环境mysql主主同步主键冲突处理
  11. 9.性能之巅 洞悉系统、企业与云计算 --- 磁盘
  12. 阿里云相关——VPC阿里云专有网络
  13. Python Numpy
  14. 格(Lattice)基础(一)
  15. 三消游戏死局算法的解析
  16. 农信计算机资料录入试题,农村信用社计算机考试试题.docx
  17. K-means clustering using random matrix sparsification(ICML2018)
  18. Spring Boot系列 - 3. SpringBoot项目学习汇总
  19. python 等值线_绘图系列(1):利用matplotlib绘制等值线图
  20. 桌面ICON的红图标

热门文章

  1. 门窗感应器和网关实现欢迎功能
  2. 微设计(www.weidesigner.com)介绍系列文章(二)
  3. 精彩!京东T7开创“新算法宝典”,图文并茂,全新演绎,太酷了
  4. stm32f10x_conf.h 与 stm32f10x.h
  5. 苹果手机变卡了怎么解决_苹果手机QQ闪退怎么办 苹果手机QQ闪退解决方法【详解】...
  6. svchost异常占用cpu排查
  7. linux命令管理GPT分区,Linux硬盘GPT分区和MBR分区
  8. 学习前端 css day3.md
  9. muduo学习笔记:base部分之高性能日志库
  10. i5 12600kf和i7 10700k选哪个