本文转载自:一文深入理解Dubbo核心模型Invoker


一、Dubbo中Invoker介绍

为什么说Invoker是Dubbo核心模型呢?

Invoker是Dubbo中的实体域,也就是真实存在的。其他模型都向它靠拢或转换成它,它也就代表一个可执行体,可向它发起invoke调用。在服务提供方,Invoker用于调用服务提供类。在服务消费方,Invoker用于执行远程调用。

二、服务提供方的Invoker

在服务提供方中的Invoker是由ProxyFactory创建而来的,Dubbo默认的ProxyFactory实现类为JavassistProxyFactory。

创建Invoker的入口方法getInvoker:

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {// 为目标类创建 Wrapperfinal Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf(36) < 0 ? proxy.getClass() : type);// 创建匿名 Invoker 类对象,并实现 doInvoke 方法。return new AbstractProxyInvoker<T>(proxy, type, url) {@Overrideprotected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments) throws Throwable {// 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);}};
}

JavassistProxyFactory创建了一个继承自AbstractProxyInvoker类的匿名对象,并覆写了抽象方法doInvoke。覆写后的doInvoke 逻辑比较简单,仅是将调用请求转发给了Wrapper类的invokeMethod 方法。以及生成 invokeMethod 方法代码和其他一些方法代码。代码生成完毕后,通过 Javassist 生成 Class 对象,最后再通过反射创建 Wrapper 实例。

注:Wapper是一个包装类。主要用于“包裹”目标类,仅可以通过getWapper(Class)方法创建子类。在创建子类过程中,子类代码会对传进来的Class对象进行解析,拿到类方法,类成员变量等信息。而这个包装类持有实际的扩展点实现类。也可以把扩展点的公共逻辑全部移到包装类中,功能上就是作为AOP实现。

创建包装类的构造方法:

public static Wrapper getWrapper(Class<?> c) {    while (ClassGenerator.isDynamicClass(c))c = c.getSuperclass();if (c == Object.class)return OBJECT_WRAPPER;// 从缓存中获取 Wrapper 实例Wrapper ret = WRAPPER_MAP.get(c);if (ret == null) {// 缓存未命中,创建 Wrapperret = makeWrapper(c);// 写入缓存WRAPPER_MAP.put(c, ret);}return ret;
}

在缓存中获取不到Wapper就会进入下面的方法makeWapper:

private static Wrapper makeWrapper(Class<?> c) {// 检测 c 是否为基本类型,若是则抛出异常if (c.isPrimitive())throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);String name = c.getName();ClassLoader cl = ClassHelper.getClassLoader(c);// c1 用于存储 setPropertyValue 方法代码StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");// c2 用于存储 getPropertyValue 方法代码StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");// c3 用于存储 invokeMethod 方法代码StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");// 生成类型转换代码及异常捕捉代码,比如://   DemoService w; try { w = ((DemoServcie) $1); }}catch(Throwable e){ throw new IllegalArgumentException(e); }c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");// pts 用于存储成员变量名和类型Map<String, Class<?>> pts = new HashMap<String, Class<?>>();// ms 用于存储方法描述信息(可理解为方法签名)及 Method 实例Map<String, Method> ms = new LinkedHashMap<String, Method>();// mns 为方法名列表List<String> mns = new ArrayList<String>();// dmns 用于存储“定义在当前类中的方法”的名称List<String> dmns = new ArrayList<String>();// --------------------------------✨ 分割线1 ✨-------------------------------------// 获取 public 访问级别的字段,并为所有字段生成条件判断语句for (Field f : c.getFields()) {String fn = f.getName();Class<?> ft = f.getType();if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers()))// 忽略关键字 static 或 transient 修饰的变量continue;// 生成条件判断及赋值语句,比如:// if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;}// if( $2.equals("age") ) { w.age = ((Number) $3).intValue(); return;}c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");// 生成条件判断及返回语句,比如:// if( $2.equals("name") ) { return ($w)w.name; }c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }");// 存储 <字段名, 字段类型> 键值对到 pts 中pts.put(fn, ft);}// --------------------------------✨ 分割线2 ✨-------------------------------------Method[] methods = c.getMethods();// 检测 c 中是否包含在当前类中声明的方法boolean hasMethod = hasMethods(methods);if (hasMethod) {c3.append(" try{");}for (Method m : methods) {if (m.getDeclaringClass() == Object.class)// 忽略 Object 中定义的方法continue;String mn = m.getName();// 生成方法名判断语句,比如:// if ( "sayHello".equals( $2 )c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");int len = m.getParameterTypes().length;// 生成“运行时传入的参数数量与方法参数列表长度”判断语句,比如:// && $3.length == 2c3.append(" && ").append(" $3.length == ").append(len);boolean override = false;for (Method m2 : methods) {// 检测方法是否存在重载情况,条件为:方法对象不同 && 方法名相同if (m != m2 && m.getName().equals(m2.getName())) {override = true;break;}}// 对重载方法进行处理,考虑下面的方法://    1. void sayHello(Integer, String)//    2. void sayHello(Integer, Integer)// 方法名相同,参数列表长度也相同,因此不能仅通过这两项判断两个方法是否相等。// 需要进一步判断方法的参数类型if (override) {if (len > 0) {for (int l = 0; l < len; l++) {// 生成参数类型进行检测代码,比如:// && $3[0].getName().equals("java.lang.Integer") //    && $3[1].getName().equals("java.lang.String")c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"").append(m.getParameterTypes()[l].getName()).append("\")");}}}// 添加 ) {,完成方法判断语句,此时生成的代码可能如下(已格式化):// if ("sayHello".equals($2) //     && $3.length == 2//     && $3[0].getName().equals("java.lang.Integer") //     && $3[1].getName().equals("java.lang.String")) {c3.append(" ) { ");// 根据返回值类型生成目标方法调用语句if (m.getReturnType() == Void.TYPE)// w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); return null;c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");else// return w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");");// 添加 }, 生成的代码形如(已格式化):// if ("sayHello".equals($2) //     && $3.length == 2//     && $3[0].getName().equals("java.lang.Integer") //     && $3[1].getName().equals("java.lang.String")) {////     w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); //     return null;// }c3.append(" }");// 添加方法名到 mns 集合中mns.add(mn);// 检测当前方法是否在 c 中被声明的if (m.getDeclaringClass() == c)// 若是,则将当前方法名添加到 dmns 中dmns.add(mn);ms.put(ReflectUtils.getDesc(m), m);}if (hasMethod) {// 添加异常捕捉语句c3.append(" } catch(Throwable e) { ");c3.append("     throw new java.lang.reflect.InvocationTargetException(e); ");c3.append(" }");}// 添加 NoSuchMethodException 异常抛出代码c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");// --------------------------------✨ 分割线3 ✨-------------------------------------Matcher matcher;// 处理 get/set 方法for (Map.Entry<String, Method> entry : ms.entrySet()) {String md = entry.getKey();Method method = (Method) entry.getValue();// 匹配以 get 开头的方法if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {// 获取属性名String pn = propertyName(matcher.group(1));// 生成属性判断以及返回语句,示例如下:// if( $2.equals("name") ) { return ($w).w.getName(); }c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");pts.put(pn, method.getReturnType());// 匹配以 is/has/can 开头的方法} else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) {String pn = propertyName(matcher.group(1));// 生成属性判断以及返回语句,示例如下:// if( $2.equals("dream") ) { return ($w).w.hasDream(); }c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");pts.put(pn, method.getReturnType());// 匹配以 set 开头的方法} else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {Class<?> pt = method.getParameterTypes()[0];String pn = propertyName(matcher.group(1));// 生成属性判断以及 setter 调用语句,示例如下:// if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; }c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }");pts.put(pn, pt);}}// 添加 NoSuchPropertyException 异常抛出代码c1.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }");c2.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }");// --------------------------------✨ 分割线4 ✨-------------------------------------long id = WRAPPER_CLASS_COUNTER.getAndIncrement();// 创建类生成器ClassGenerator cc = ClassGenerator.newInstance(cl);// 设置类名及超类cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id);cc.setSuperClass(Wrapper.class);// 添加默认构造方法cc.addDefaultConstructor();// 添加字段cc.addField("public static String[] pns;");cc.addField("public static " + Map.class.getName() + " pts;");cc.addField("public static String[] mns;");cc.addField("public static String[] dmns;");for (int i = 0, len = ms.size(); i < len; i++)cc.addField("public static Class[] mts" + i + ";");// 添加方法代码cc.addMethod("public String[] getPropertyNames(){ return pns; }");cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");cc.addMethod("public String[] getMethodNames(){ return mns; }");cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");cc.addMethod(c1.toString());cc.addMethod(c2.toString());cc.addMethod(c3.toString());try {// 生成类Class<?> wc = cc.toClass();// 设置字段值wc.getField("pts").set(null, pts);wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));wc.getField("mns").set(null, mns.toArray(new String[0]));wc.getField("dmns").set(null, dmns.toArray(new String[0]));int ix = 0;for (Method m : ms.values())wc.getField("mts" + ix++).set(null, m.getParameterTypes());// 创建 Wrapper 实例return (Wrapper) wc.newInstance();} catch (RuntimeException e) {throw e;} catch (Throwable e) {throw new RuntimeException(e.getMessage(), e);} finally {cc.release();ms.clear();mns.clear();dmns.clear();}
}

代码较长,同样注释也很多。大致说一下里面逻辑:

  • **创建c1,c2,c3三个字符串,用于存储类型转换代码和异常捕捉代码,而后pts用于存储成员变量名和类型,ms用于存储方法描述信息(可理解为方法签名)及Method实例,mns为方法名列表,dmns用于存储“定义在当前类中的方法”的名称。**在这里做完了一些初始工作
  • 获取所有public字段,用c1存储条件判断及赋值语句,可以理解为通过c1能够为public字段赋值,而c2是条件判断及返回语句,同样的是得到public字段的值。再用pts存储<字段名,字段类型>。也就是现在能对目标类字段进行操作了,而要操作一些私有字段,是要访问set开头和get开头的方法,同样这些方法也都对应使用c1存set,c2存get,pts存储<属性名,属性类型>
  • 现在到类中的方法,先检查方法中的参数,然后再检查是否有重载的方法。通过c3存储调用目标方法的语句以及方法中可能会抛出的异常,而后用mns集合进行存储方法名,对已经声明的方法存到ms中,未声明但是定义了的方法存在dmns中。
  • **通过ClassGenerator为刚刚生成的代码构建Class类,并通过反射创建对象。**ClassGenerator是Dubbo自己封装的,该类的核心是toClass()的重载方法 toClass(ClassLoader, ProtectionDomain),该方法通过javassist构建Class。

最后在创建完成Wapper类,回到上面的getInvoker方法然后通过下面这条语句

return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);//进入到invokeMethod中public Object invokeMethod(Object instance, String mn, Class<?>[] types, Object[] args) throws NoSuchMethodException {if ("getClass".equals(mn)) {return instance.getClass();} else if ("hashCode".equals(mn)) {return instance.hashCode();} else if ("toString".equals(mn)) {return instance.toString();} else if ("equals".equals(mn)) {if (args.length == 1) {return instance.equals(args[0]);} else {throw new IllegalArgumentException("Invoke method [" + mn + "] argument number error.");}} else {throw new NoSuchMethodException("Method [" + mn + "] not found.");}}
};

到这里Invoker就能实现调用服务提供类的方法了。也就是服务提供类的Invoker实体域创建完成。底层是通过javassist来构建对象的。

三、服务消费方的Invoker

在服务消费方,Invoker用于执行远程调用。Invoker是由 Protocol实现类构建而来。Protocol实现类有很多但是最常用的两个,分别是RegistryProtocol和DubboProtocol。

DubboProtocol的refer方法:

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {optimizeSerialization(url);// 创建 DubboInvokerDubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);invokers.add(invoker);return invoker;
}

上述方法较为简单,最重要的一个在于getClients。这个方法用于获取客户端实例,实例类型为ExchangeClient。ExchangeClient实际上并不具备通信能力,它需要基于更底层的客户端实例进行通信。比如NettyClient、MinaClient等,默认情况下,Dubbo使用NettyClient进行通信。每次创建好的Invoker都会添加到invokers这个集合里。也就是可以认为服务消费方的Invoker是一个具有通信能力的Netty客户端

getClients方法:

private ExchangeClient[] getClients(URL url) {// 是否共享连接boolean service_share_connect = false;// 获取连接数,默认为0,表示未配置int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);// 如果未配置 connections,则共享连接if (connections == 0) {service_share_connect = true;connections = 1;}ExchangeClient[] clients = new ExchangeClient[connections];for (int i = 0; i < clients.length; i++) {if (service_share_connect) {// 获取共享客户端clients[i] = getSharedClient(url);} else {// 初始化新的客户端clients[i] = initClient(url);}}return clients;
}//进入到获取共享客户端方法private ExchangeClient getSharedClient(URL url) {String key = url.getAddress();// 获取带有“引用计数”功能的 ExchangeClientReferenceCountExchangeClient client = referenceClientMap.get(key);if (client != null) {if (!client.isClosed()) {// 增加引用计数client.incrementAndGetCount();return client;} else {referenceClientMap.remove(key);}}locks.putIfAbsent(key, new Object());synchronized (locks.get(key)) {if (referenceClientMap.containsKey(key)) {return referenceClientMap.get(key);}// 创建 ExchangeClient 客户端ExchangeClient exchangeClient = initClient(url);// 将 ExchangeClient 实例传给 ReferenceCountExchangeClient,这里使用了装饰模式client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);referenceClientMap.put(key, client);ghostClientMap.remove(key);locks.remove(key);return client;}
}//进入到初始化客户端方法private ExchangeClient initClient(URL url) {// 获取客户端类型,默认为 nettyString str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));// 添加编解码和心跳包参数到 url 中url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));// 检测客户端类型是否存在,不存在则抛出异常if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {throw new RpcException("Unsupported client type: ...");}ExchangeClient client;try {// 获取 lazy 配置,并根据配置值决定创建的客户端类型if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {// 创建懒加载 ExchangeClient 实例client = new LazyConnectExchangeClient(url, requestHandler);} else {// 创建普通 ExchangeClient 实例client = Exchangers.connect(url, requestHandler);}} catch (RemotingException e) {throw new RpcException("Fail to create remoting client for service...");}return client;
}//进入到connect方法中,getExchanger 会通过 SPI 加载 HeaderExchangeClient 实例,这个方法比较简单,大家自己看一下吧。接下来分析 HeaderExchangeClient 的实现。public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");// 获取 Exchanger 实例,默认为 HeaderExchangeClientreturn getExchanger(url).connect(url, handler);
}//创建HeaderExchangeClient实例 public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {// 这里包含了多个调用,分别如下:// 1. 创建 HeaderExchangeHandler 对象// 2. 创建 DecodeHandler 对象// 3. 通过 Transporters 构建 Client 实例// 4. 创建 HeaderExchangeClient 对象return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}//通过 Transporters 构建 Client 实例public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}ChannelHandler handler;if (handlers == null || handlers.length == 0) {handler = new ChannelHandlerAdapter();} else if (handlers.length == 1) {handler = handlers[0];} else {// 如果 handler 数量大于1,则创建一个 ChannelHandler 分发器handler = new ChannelHandlerDispatcher(handlers);}// 获取 Transporter 自适应拓展类,并调用 connect 方法生成 Client 实例return getTransporter().connect(url, handler);
}//创建Netty对象public Client connect(URL url, ChannelHandler listener) throws RemotingException {// 创建 NettyClient 对象return new NettyClient(url, listener);
}

上面的源码大概分一下几个逻辑:

  • 通过refer方法进入DubboInvoker实例的创建,在这个实例中其实serviceType,url,以及invokers都已经是不用去关心的,invokers可以说是存储以及创建好的Invoker。而最关键的在于getClient方法。可以这么认为,现在的Invoker是一个Netty客户端。而在服务提供方的Invoker是一个Wapper类。
  • 在getClient方法里面首先根据connections数量决定是获取共享客户端还是创建新的客户端实例,默认情况下是获取共享客户端,但是获取共享客户端中若缓存中拿不到对应客户端也会新建一个客户端。最终返回的是ExchangeClient,而当前的ExchangeClient也没有通信能力,需要更加底层的Netty客户端。
  • initClient方法首先获取用户配置的客户端类型,默认为Netty,然后检测用户配置的客户端类型是否存在,不存在就要抛出异常,最后根据lazy配置觉得创建什么类型的客户端。LazyConnectExchangeClient代码并不是很复杂,该类会在request方法被调用时通过Exchangers的connect方法创建 ExchangeClient客户端
  • getExchanger会通过SPI加载HeaderExchangeClient实例。最后通过Transporter实现类以及调用Netty的API来创建Netty客户端。最后层层返回,就最后成为了底层为Netty上层为DubboInvoker实例的这样一个类。

RegistryProtocol中的refer:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {// 取 registry 参数值,并将其设置为协议头url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);// 获取注册中心实例Registry registry = registryFactory.getRegistry(url);if (RegistryService.class.equals(type)) {return proxyFactory.getInvoker((T) registry, type, url);}// 将 url 查询字符串转为 MapMap<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));// 获取 group 配置String group = qs.get(Constants.GROUP_KEY);if (group != null && group.length() > 0) {if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1|| "*".equals(group)) {// 通过 SPI 加载 MergeableCluster 实例,并调用 doRefer 继续执行服务引用逻辑return doRefer(getMergeableCluster(), registry, type, url);}}// 调用 doRefer 继续执行服务引用逻辑return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {// 创建 RegistryDirectory 实例RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);// 设置注册中心和协议directory.setRegistry(registry);directory.setProtocol(protocol);Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());// 生成服务消费者链接URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);// 注册服务消费者,在 consumers 目录下新节点if (!Constants.ANY_VALUE.equals(url.getServiceInterface())&& url.getParameter(Constants.REGISTER_KEY, true)) {registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,Constants.CHECK_KEY, String.valueOf(false)));}// 订阅 providers、configurators、routers 等节点数据directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,Constants.PROVIDERS_CATEGORY+ "," + Constants.CONFIGURATORS_CATEGORY+ "," + Constants.ROUTERS_CATEGORY));// 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个Invoker invoker = cluster.join(directory);ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);return invoker;
}//进入到集群创建Invoker模式@SPI(FailoverCluster.NAME)
public interface Cluster {/*** 合并其中Directory的Invoker为一个Invoker*/@Adaptive<T> Invoker<T> join(Directory<T> directory) throws RpcException;
}//进入到MockerClusterWrapper实现类中public class MockClusterWrapper implements Cluster {private Cluster cluster;public MockClusterWrapper(Cluster cluster) {this.cluster = cluster;}public <T> Invoker<T> join(Directory<T> directory) throws RpcException {return new MockClusterInvoker<T>(directory, this.cluster.join(directory));}
}//具体的invoke方法public Result invoke(Invocation invocation) throws RpcException {Result result = null;String value = directory.getUrl().getMethodParameter(invocation.getMethodName(),Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || value.equalsIgnoreCase("false")){//no mockresult = this.invoker.invoke(invocation);} else if (value.startsWith("force")) {if (logger.isWarnEnabled()) {logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " +  directory.getUrl());}//force:direct mockresult = doMockInvoke(invocation, null);} else {//fail-mocktry {result = this.invoker.invoke(invocation);}catch (RpcException e) {if (e.isBiz()) {throw e;} else {if (logger.isWarnEnabled()) {logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " +  directory.getUrl(), e);}//fail:mockresult = doMockInvoke(invocation, e);}}}return result;
}

大致说一下上面的逻辑:

  • 当前的Invoker底层依然是NettyClient,但是此时注册中心是集群搭建模式。所以需要将多个Invoker合并为一个,**这里是逻辑合并的。实际上Invoker底层还是会有多个,只是通过一个集群模式来管理。所以暴露出来的就是一个集群模式的Invoker。**于是进入Cluster.join方法。
  • Cluster是一个通用代理类,会根据URL中的cluster参数值定位到实际的Cluster实现类也就是FailoverCluster。这里用到了@SPI注解,也就是需要ExtensionLoader扩展点加载机制,而该机制在实例化对象是,会在实例化后自动套上Wapper
  • 但是是集群模式所以需要Dubbo中另外一个核心机制——Mock。Mock可以在测试中模拟服务调用的各种异常情况,还可以实现服务降级。在MockerClusterInvoker中,Dubbo先检查URL中是否存在mock参数。(这个参数可以通过服务治理后台Consumer端的屏蔽和容错进行设置或者直接动态设置mock参数值)如果存在force开头,这不发起远程调用直接执行降级逻辑。如果存在fail开头,则在远程调用异常时才会执行降级逻辑。
  • 可以说注册中心为集群模式时,Invoker就会外面多包裹一层mock逻辑。是通过Wapper机制实现的。最终可以在调用或者重试时,每次都通过Dubbo内部的负载均衡机制选出多个Invoker中的一个进行调用

四、总结

到这里Invoker的实现就可以是说完了,总结一下,在服务提供方Invoker是javassist创建的服务类的实例,可以实现调用服务类内部的方法和修改字段。而在服务消费方的Invoker是基于Netty的客户端。最终通过服务消费方Netty客户端获得服务提供方创建的服务类实例。而后消费方为保护服务类就需要为其创建代理类,这样就可以在不实例化服务类情况下安全有效的远程调用服务类内部方法并且得到具体数据了。


本文转载自:一文深入理解Dubbo核心模型Invoker

一文深入理解Dubbo核心模型Invoker相关推荐

  1. 一文带你深入理解JVM内存模型

    一文带你深入理解JVM内存模型 一.JAVA的并发模型 共享内存模型 在共享内存的并发模型里面,线程之间共享程序的公共状态,线程之间通过读写内存中公共状态来进行隐式通信 该内存指的是主内存,实际上是物 ...

  2. 一文详解 Dubbo 注册发现流程

    浅谈Dubbo的注册发现调用流程 前言 笔者从接触分布式开始用的一直是Dubbo和Nacos作为分布式治理的框架,而之前一直没有研究过Dubbo他到底是如何进行服务注册和暴露的,很久之前就想写一篇关于 ...

  3. 智能文档理解:通用文档预训练模型

    预训练模型到底是什么,它是如何被应用在产品里,未来又有哪些机会和挑战? 预训练模型把迁移学习很好地用起来了,让我们感到眼前一亮.这和小孩子读书一样,一开始语文.数学.化学都学,读书.网上游戏等,在脑子 ...

  4. cstring只获取到第一个数_一文讲透 Dubbo 负载均衡之最小活跃数算法

    (给ImportNew加星标,提高Java技能) 作者:why技术(本文来自作者投稿) 本文是对于Dubbo负载均衡策略之一的最小活跃数算法的详细分析.文中所示源码,没有特别标注的地方均为2.6.0版 ...

  5. 通俗理解LDA主题模型

    0 前言 印象中,最开始听说"LDA"这个名词,是缘于rickjin在2013年3月写的一个LDA科普系列,叫LDA数学八卦,我当时一直想看来着,记得还打印过一次,但不知是因为这篇 ...

  6. Dubbo核心知识点

    本文来说下Dubbo核心知识点,也是面试中的重难点. 文章目录 Dubbo是什么 RPC又是什么 说说Dubbo的分层 能说下Dubbo的工作原理吗 为什么要通过代理对象通信 说说服务暴露的流程 说说 ...

  7. LDA通俗理解LDA主题模型

    转自:http://blog.csdn.net/v_july_v/article/details/41209515#t13 通俗理解LDA主题模型 0 前言 印象中,最开始听说"LDA&qu ...

  8. 通俗理解LDA主题模型(转载自 v_JULY_v 大佬)

    通俗理解LDA主题模型 原文:https://blog.csdn.net/v_july_v/article/details/41209515 0 前言 印象中,最开始听说"LDA" ...

  9. 理解 LDA 主题模型

    前言 gamma函数 0 整体把握LDA 1 gamma函数 beta分布 1 beta分布 2 Beta-Binomial 共轭 3 共轭先验分布 4 从beta分布推广到Dirichlet 分布 ...

最新文章

  1. java流类图结构_java I/O 流总结
  2. mnn c++ windows 调用实例
  3. JavaScript的预编译及执行顺序
  4. php sprintf %1,PHP sprintf() 函数的应用(定义和用法)
  5. Codevs 1010 过河卒 2002年NOIP全国联赛普及组
  6. 祖父元素_帮助祖父母建立Skype帐户的UX经验教训
  7. git 查看自己秘钥_gitlab配置ssh密钥及简单使用
  8. 月度开销(信息学奥赛一本通-T1243)
  9. jython语法入门
  10. LeetCode(530)——二叉搜索树的最小绝对差(JavaScript)
  11. 因漏洞Dropbox用户邮件地址被泄露给垃圾邮件发送者
  12. ubuntu重装用户root认证
  13. 移动硬盘插入提示需要格式化RAW_当正常驱动器变成RAW驱动器时怎么修复
  14. 有道翻译API接口的服务开通与使用Python进行接口调用实例演示,有道智云·AI开放平台
  15. java 日历转化-阴历转阳历
  16. 入了giant FCR 3100,纪念一下!¥1800元
  17. js停止setInterval
  18. Allegro_SMT手工焊接辅助程序
  19. 黄芪桂圆枸杞红枣茶的作用
  20. Gurobi运筹学开发教程02:基于系数矩阵和右侧项的python+gurobi建模

热门文章

  1. 秀才第三次进京赶考的故事
  2. 基于DELF(DEep Local Features)深度局部特征的图像检索实践
  3. java学习6:商品管理系统例子
  4. Break or Repair?
  5. Excel表格的打开密码如何设置和取消?
  6. 新媒体运营能做到主管级别吗
  7. ajp8009端口分析
  8. csdn程序竞赛第六期-python题解
  9. python安装包遇到的报错(PyCharm)
  10. 可以删除的android系统自带程序