为什么80%的码农都做不了架构师?>>>   

你真的明白RPC 吗?一起来探究 RPC 的实质

​ 不论你是科班出身还是半路转行,这么优秀的你一定上过小学语文,那么对扩句和缩句你一定不陌生。缩句就是去除各种修饰提炼出一句话的核心,而不失基本的语义。下面来实现一个简易的 rpc 程序探究其实质,进而去理解复杂的 rpc 框架。所谓复杂的框架就是在简单的过程中加入了一些设计装饰将rpc的功能丰富起来,如 dubbo 的 filter、router、loadblance、集群容错、多种 Invoker 、通讯协议等等,这就是一个扩句的过程。

RPC是指远程过程调用,也就是说两台服务器A、B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络去发起一次调用请求获取结果。

​ 无论是市面上主流的 rpc 框架还是小众的 rpc 框架都实现了上述 rpc的语义。【服务治理型:dubbo、dubbox、motan;多语言型:grpc、thrift、avro、protocol buffers】

打一波广告:【博主最近在写一个 java 实现的 rpc 框架 bridge 欢迎关注,考虑Mesh 化】

一、原理

首先用一幅图来简单描述一下 rpc 的调用过程,从 dubbo 官网拿来的,不算是最简单的图,但是也非常简单了,去掉上面的 Registry 和下面的 Monitor 剩下的就是最简单的 rpc 调用,说白了就是一个网络请求。

过程描述:

  1. 启动服务端provider,并向注册中心登记一下自己暴露服务的地址和服务详情
  2. 然后启动消费端consumer, 订阅注册中心的内容,也就是订阅服务,获取服务的详情
  3. 如果服务有变动,注册中心会通知消费端去更新订阅内容,更新服务详情。
  4. 客户端拿到了服务详情,通过网络对服务端发起网络请求,获取结果
  5. 监视器可以获取到服务调用详情和消费详情,但不限于此

OK,原理就是这么简单,接下来根据上面的描述逐步实现。

二、动手实践

下面基于 springboot 来实现上述的过程。

2.1 构建模块

搭建工程和子模块,工程结构如下:

2.2 实现服务端

看下服务端的内容,贴图

把接口定义在 api 模块,consumer 和 provider 模块都要引用到,接口HelloService代码如下

package com.glmapper.simple.api;/*** service interface** @author: Jerry*/
public interface HelloService {/*** service function** @param name* @return*/String hello(String name);
}

然后在 provider 模块实现接口,用自定注解 @SimpleProvider 标识,先看下注解内容

package com.glmapper.simple.provider.annotation;/*** 自定义服务注解** @author Jerry*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
// 标明可被 Spring 扫描
@Component
public @interface SimpleProvider {Class<?> value();
}

注解使用了@Component标识,所以可被 spring 扫描到,接下来看实现类HelloServiceImpl

package com.glmapper.simple.provider.service;/*** service implement class** @author: Jerry*/
@SimpleProvider(HelloService.class)
public class HelloServiceImpl implements HelloService {/*** service function** @param name* @return*/@Overridepublic String hello(String name) {return "Hello! " + name;}
}

在定义一个服务配置的类SimpleProviderProperties,方便通过 application.yml 文件配置,

package com.glmapper.simple.provider.property;/*** provider properties** @author: Jerry*/
public class SimpleProviderProperties {/*** 暴露服务的端口*/private Integer port;public Integer getPort() {return port;}public void setPort(Integer port) {this.port = port;}
}

到这里基础的类文件就已经结束了,下面开始服务初始化,入口 ProviderInitializer

package com.glmapper.simple.provider;/*** 启动并注册服务** @author Jerry*/
public class ProviderInitializer implements ApplicationContextAware, InitializingBean {private static final Logger LOGGER = LoggerFactory.getLogger(ProviderInitializer.class);private SimpleProviderProperties providerProperties;/*** service registry*/private ServiceRegistry serviceRegistry;/*** store interface and service implement mapping*/private Map<String, Object> handlerMap = new HashMap<>();public ProviderInitializer(SimpleProviderProperties providerProperties, ServiceRegistry serviceRegistry) {this.providerProperties = providerProperties;this.serviceRegistry = serviceRegistry;}@Overridepublic void setApplicationContext(ApplicationContext ctx) throws BeansException {// 获取被 SimpleProvider 注解的 BeanMap<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(SimpleProvider.class);if (MapUtils.isNotEmpty(serviceBeanMap)) {for (Object serviceBean : serviceBeanMap.values()) {String interfaceName = serviceBean.getClass().getAnnotation(SimpleProvider.class).value().getName();handlerMap.put(interfaceName, serviceBean);}}}@Overridepublic void afterPropertiesSet() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();ChannelHandler channelHandler = new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new SimpleDecoder(SimpleRequest.class)).addLast(new SimpleEncoder(SimpleResponse.class)).addLast(new SimpleHandler(handlerMap));}};bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(channelHandler).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);String host = getLocalHost();if (null == host) {LOGGER.error("can't get service address,because address is null");throw new SimpleException("can't get service address,because address is null");}int port = providerProperties.getPort();ChannelFuture future = bootstrap.bind(host, port).sync();LOGGER.debug("server started on port {}", port);if (serviceRegistry != null) {String serverAddress = host + ":" + port;serviceRegistry.register(serverAddress);}future.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}/*** get service host** @return*/private String getLocalHost() {Enumeration<NetworkInterface> allNetInterfaces;try {allNetInterfaces = NetworkInterface.getNetworkInterfaces();} catch (SocketException e) {LOGGER.error("get local address error,cause:", e);return null;}while (allNetInterfaces.hasMoreElements()) {NetworkInterface netInterface = allNetInterfaces.nextElement();Enumeration<InetAddress> addresses = netInterface.getInetAddresses();while (addresses.hasMoreElements()) {InetAddress ip = addresses.nextElement();if (ip instanceof Inet4Address && !ip.isLoopbackAddress() && !ip.getHostAddress().contains(":")) {return ip.getHostAddress();}}}return null;}
}

描述一下这个类做了什么工作:

  • 首先他实现了ApplicationContextAware, InitializingBean这两个 spring 中接口,根据IOC容器初始化的顺序,会依次回调用接口中的setApplicationContextafterPropertiesSet 方法。

    • setApplicationContext方法中获取了容器中被@SimpleProvider标注的类,并将服务接口名和服务实现类绑定,存放到handlerMap中,在@SimpleProvider中有一个 value 属性,是考虑到一个类可以实现多个接口,通过 value 可以指定哪个服务接口,当然也可以定义为数组,处理多个接口
    • afterPropertiesSet 方法中做了两件事:
      • 在服务端开启了一个处理socket请求的线程池,监听和处理服务暴露端口上接受到的请求,指定了一个处理器SimpleHandler
      • 调用ServiceRegistry类的registry方法向 zookeeper 注册服务的地址和端口,这里没有用到协议,只注册了 ip:port

SimpleHandler是一个实现了 nettySimpleChannelInboundHandler的请求处理器类

package com.glmapper.simple.provider.handler;/*** request handler** @author Jerry*/
public class SimpleHandler extends SimpleChannelInboundHandler<SimpleRequest> {private static final Logger LOGGER = LoggerFactory.getLogger(SimpleHandler.class);private final Map<String, Object> handlerMap;public SimpleHandler(Map<String, Object> handlerMap) {this.handlerMap = handlerMap;}@Overridepublic void channelRead0(final ChannelHandlerContext ctx, SimpleRequest request) throws Exception {SimpleResponse response = new SimpleResponse();response.setRequestId(request.getRequestId());try {Object result = handle(request);response.setResult(result);} catch (Throwable t) {response.setError(t);}ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}private Object handle(SimpleRequest request) throws Throwable {String className = request.getClassName();Object serviceBean = handlerMap.get(className);Class<?> serviceClass = serviceBean.getClass();String methodName = request.getMethodName();Class<?>[] parameterTypes = request.getParameterTypes();Object[] parameters = request.getParameters();FastClass serviceFastClass = FastClass.create(serviceClass);FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);return serviceFastMethod.invoke(serviceBean, parameters);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {LOGGER.error("server caught exception", cause);ctx.close();}
}

SimpleHandler基于 netty 的事件驱动模型触发对应的方法,当收到请求事件会调用channelRead0方法,这个方法的作用就是,根据请求参数中的接口名找到对应的实现类调用指定的方法,然后把结果返回。

再瞅瞅ServiceRegistry,入口是ProviderInitializer调用了ServiceRegistryregistry方法

package com.glmapper.simple.provider.registry;/*** connect zookeeper to registry service** @author Jerry*/
public class ServiceRegistry {private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);private ZookeeperProperties zookeeperProperties;public ServiceRegistry(ZookeeperProperties zookeeperProperties) {this.zookeeperProperties = zookeeperProperties;}public void register(String data) {if (data != null) {ZooKeeper zk = ZookeeperUtils.connectServer(zookeeperProperties.getAddress(), zookeeperProperties.getTimeout());if (zk != null) {addRootNode(zk);createNode(zk, data);}}}/*** add one zookeeper root node** @param zk*/private void addRootNode(ZooKeeper zk) {try {String registryPath = zookeeperProperties.getRootPath();Stat s = zk.exists(registryPath, false);if (s == null) {zk.create(registryPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} catch (KeeperException | InterruptedException e) {LOGGER.error("zookeeper add root node error,cause:", e);}}private void createNode(ZooKeeper zk, String data) {try {byte[] bytes = data.getBytes(Charset.forName("UTF-8"));String dataPath = zookeeperProperties.getRootPath() + zookeeperProperties.getDataPath();String path = zk.create(dataPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);LOGGER.debug("create zookeeper node ({} => {})", path, data);} catch (KeeperException | InterruptedException e) {LOGGER.error("create zookeeper node error,cause:", e);}}
}

ServiceRegistry类做的工作比较简单,就是把 服务ip:port注册到 zk 的指定目录下

  • 创建根节点,根节点是个永久节点
  • 在根节点下创建临时的子节点,子节点存储了服务的 ip:port,服务被挂掉对应的子节点就会被干掉

2.3 消费端

消费端内容:

消费端的内容比较少,核心就三个类:ServiceDiscoveryConsumerHandlerConsumerProxy

先看下ServiceDiscovery内容:

package com.glmapper.simple.consumer.discovery;/*** 服务发现:连接ZK,添加watch事件** @author Jerry*/
public class ServiceDiscovery {private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);private volatile List<String> nodes = new ArrayList<>();private ZookeeperProperties zookeeperProperties;public ServiceDiscovery(ZookeeperProperties zookeeperProperties) {this.zookeeperProperties = zookeeperProperties;String address = zookeeperProperties.getAddress();int timeout = zookeeperProperties.getTimeout();ZooKeeper zk = ZookeeperUtils.connectServer(address, timeout);if (zk != null) {watchNode(zk);}}public String discover() {String data = null;int size = nodes.size();if (size > 0) {if (size == 1) {data = nodes.get(0);LOGGER.debug("using only node: {}", data);} else {data = nodes.get(ThreadLocalRandom.current().nextInt(size));LOGGER.debug("using random node: {}", data);}}return data;}private void watchNode(final ZooKeeper zk) {try {Watcher childrenNodeChangeWatcher = event -> {if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {watchNode(zk);}};String rootPath = zookeeperProperties.getRootPath();List<String> nodeList = zk.getChildren(rootPath, childrenNodeChangeWatcher);List<String> nodes = new ArrayList<>();for (String node : nodeList) {byte[] bytes = zk.getData(rootPath + "/" + node, false, null);nodes.add(new String(bytes, Charset.forName("UTF-8")));}LOGGER.info("node data: {}", nodes);this.nodes = nodes;} catch (KeeperException | InterruptedException e) {LOGGER.error("节点监控出错,原因:", e);}}
}

这个类的入口是构造器,作用是获取 zk 的地址,然后获取 zk 上的节点信息,这里没有实现服务订阅,也就是说如果 zk 上原本有两个服务,挂掉一个,客户端不会剔除挂掉的服务信息,导致调用失败。

然后是ConsumerProxy,它是一个代理工厂:

package com.glmapper.simple.consumer.proxy;/*** ConsumerProxy** @author Jerry*/
public class ConsumerProxy {private ServiceDiscovery serviceDiscovery;public ConsumerProxy(ServiceDiscovery serviceDiscovery) {this.serviceDiscovery = serviceDiscovery;}@SuppressWarnings("unchecked")public <T> T create(Class<?> interfaceClass) {return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class<?>[]{interfaceClass},new SimpleInvocationHandler());}private class SimpleInvocationHandler implements InvocationHandler {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {SimpleRequest request = buildRequest(method, args);String serverAddress = getServerAddress();String[] array = serverAddress.split(":");String host = array[0];int port = Integer.parseInt(array[1]);ConsumerHandler consumerHandler = new ConsumerHandler(host, port);SimpleResponse response = consumerHandler.send(request);if (response.getError() != null) {throw new SimpleException("service invoker error,cause:", response.getError());} else {return response.getResult();}}private SimpleRequest buildRequest(Method method, Object[] args) {SimpleRequest request = new SimpleRequest();request.setRequestId(UUID.randomUUID().toString());request.setClassName(method.getDeclaringClass().getName());request.setMethodName(method.getName());request.setParameterTypes(method.getParameterTypes());request.setParameters(args);return request;}private String getServerAddress() {String serverAddress = null;if (serviceDiscovery != null) {serverAddress = serviceDiscovery.discover();}if (null == serverAddress) {throw new SimpleException("no server address available");}return serverAddress;}}
}

这里有个内部类SimpleInvocationHandler是生产代理的核心,方法的核心是在 SimpleInvocationHandler.invoke()中是调用这两行代码

ConsumerHandler consumerHandler = new ConsumerHandler(host, port);
SimpleResponse response = consumerHandler.send(request);

发起网络请求,下面看下ConsumerHandler

package com.glmapper.simple.consumer.handler;/*** RPC真正调用客户端** @author Jerry*/
public class ConsumerHandler extends SimpleChannelInboundHandler<SimpleResponse> {private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerHandler.class);private int port;private String host;private SimpleResponse response;private CountDownLatch latch = new CountDownLatch(1);public ConsumerHandler(String host, int port) {this.host = host;this.port = port;}@Overridepublic void channelRead0(ChannelHandlerContext ctx, SimpleResponse response) throws Exception {this.response = response;latch.countDown();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {LOGGER.error("client caught exception", cause);ctx.close();}public SimpleResponse send(SimpleRequest request) throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();ChannelInitializer<SocketChannel> channelHandler = new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline()// 将 RPC 请求进行编码(为了发送请求).addLast(new SimpleEncoder(SimpleRequest.class))// 将 RPC 响应进行解码(为了处理响应).addLast(new SimpleDecoder(SimpleResponse.class))// 使用 RpcClient 发送 RPC 请求.addLast(ConsumerHandler.this);}};bootstrap.group(group).channel(NioSocketChannel.class).handler(channelHandler).option(ChannelOption.SO_KEEPALIVE, true);ChannelFuture future = bootstrap.connect(host, port).sync();future.channel().writeAndFlush(request).sync();latch.await();if (response != null) {future.channel().closeFuture().sync();}return response;} finally {group.shutdownGracefully();}}
}

这个类和服务端的 ProviderHandler 的代码差不多,也是netty通讯类

附一下 GitHub 地址 simple-rpc

转载于:https://my.oschina.net/GinkGo/blog/1834620

你真的明白RPC 吗?一起来探究 RPC 的实质相关推荐

  1. 身为产品经理,你真的明白头脑风暴怎么运用吗?

    身为产品经理,你真的明白头脑风暴怎么运用吗? 头脑风暴法(Brain storming),由美国奥斯本首创,主要由工作小组人员在正常融洽和不受任何限制的气氛中以会议形式进行讨论.座谈,打破常规,积极思 ...

  2. 【RPC】RPC基础(二)RPC协议

    文章目录 RPC核心原理 1. RPC基础 1.2 RPC协议 为什么设计RPC协议 如何设计RPC协议 可扩展协议的设计 思考 RPC核心原理 1. RPC基础 1.2 RPC协议 RPC协议和HT ...

  3. RPC框架(一)RPC简介

    一.概述 二.RPC 2.1.RPC定义 2.2.RPC主要组成部分 三.影响RPC框架性能的因素 四.工业界的 RPC 框架一览 4.1.国内 4.2.国外 五.如何选择RPC框架 一.概述 随着公 ...

  4. linux停止rpc服务,Linux系统安装启动rpc服务,解决Loadrunner监控不到资源问题

    前言:在LoadRunner Controller下添加Unix Resource Graphs时,报错如下: Monitor name :UNIX Resources. Cannot initial ...

  5. JAVA RPC 生产级高可用RPC框架使用分享

    先放出链接,喜欢的给个star:https://gitee.com/a1234567891/koalas-rpc 一:项目介绍 koalas-RPC 个人作品,提供大家交流学习,有意见请私信,欢迎拍砖 ...

  6. 什么是RPC?RPC好处?常用的RPC框架?

    RPC(Remote Procedure Call Protocol)远程过程调用协议.一个通俗的描述是:客户端在不知道调用细节的情况下,调用存在于远程计算机上的某个对象,就像调用本地应用程序中的对象 ...

  7. 笔记本电脑显示rpc服务器不可用,Win7电脑RPC服务器不可用怎么办 RPC服务器不可用解...

    笔记本采购 最近有win7系统用户反馈,电脑提示rpc服务器不可用!Rpc服务器是指远程过程调用协议:一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.有时候由于过度优化关闭了r ...

  8. win7系统安装信息服务器不可用怎么办,Win7电脑RPC服务器不可用怎么办 RPC服务器不可用解决方法...

    最近有win7系统用户反馈,电脑提示rpc服务器不可用!Rpc服务器是指远程过程调用协议:一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.有时候由于过度优化关闭了rpc服务,致 ...

  9. 为带你搞懂RPC,手写了RPC框架

    如今,分布式系统大行其道,RPC 有着举足轻重的地位.Dubbo.Thrift.gRpc 等框架各领风骚,学习RPC是新手也是老鸟的必修课.本文带你手撸一个rpc-spring-starter,深入学 ...

最新文章

  1. php中的extract函数
  2. python画统计图怎么在右上角表示哪条线代表什么_Python-matplotlib统计图之箱线图漫谈...
  3. java ipv6校验_JS及java验证 IPV6,IPV4地址的 正则表达式 | 学步园
  4. WIN7+wampserver2.4+zend stadio10.6.1配置Xdebug
  5. oracle dbms_crypto,Oracle的dbms_obfuscation_toolkit加密解密数据
  6. stl min函数_std :: min_element()函数以及C ++ STL中的示例
  7. 自动编码机(Autodencoder)
  8. Selenium +Python项目实践(注册流程)
  9. 如何免费下载英文论文
  10. iOS-成为或取消第一响应者
  11. 过招多家大厂提炼的iOS面试心经(答案版)
  12. 模态框间相互传输数据
  13. 【数据结构】CH3 栈和队列
  14. ai是个什么软件,和PS一样么
  15. 【高级篇 / System】(7.0) ❀ 07. HA 下配置核心交换机 (下) ❀ FortiGate 防火墙
  16. mysql建表日期类型
  17. Android商城开发(一)——一次活动页需求引发的危机感
  18. vba 全拼_EXCEL中直接把中文转换成拼音全拼
  19. det曲线_Winform中设置ZedGraph的曲线符号Symbol以及对应关系
  20. 服务器机柜组件是,网络/服务器机柜_42u标准机柜尺寸【怡富机箱机柜厂家】

热门文章

  1. 2013年4月 计算机网络原理答案,2013年4月份自学考试计算机网络原理04741答案
  2. Go 语言开发第一天,我的学习之路从这里开始
  3. CSS改变插入光标颜色
  4. Vue.JS实现垂直方向展开、收缩不定高度模块的JS组件
  5. 基于3D模型的MaskRCNN的训练数据生成
  6. Caffe RPN :error C2220: warning treated as error - no 'object' file generated
  7. BZOJ1565:[NOI2009]植物大战僵尸——题解
  8. Linux基础命令介绍七:网络传输与安全
  9. QD75运动模块使用
  10. 2016中国APP分类排行榜发布暨颁奖晚宴 —— 兰亭修禊少长有王谢 黔香阁暖高见望诸公...