一、消费端Invoker

  1. 还是在Spring的扩展点进入(注解版从ReferenceAnnotationBeanPostProcessor)

    1. DubboNamespaceHandler
    @Override
    public void init() {registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));//2.registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
    }
    
    2. ReferenceBean实现FactoryBean,Spring中规定FactoryBean实现类实例化的是getObject()方法返回的对象public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {//省略代码。。。@Overridepublic Object getObject() {//3.return get();}}
    3. ReferenceConfig
    public synchronized T get() {//检查和更新配置checkAndUpdateSubConfigs();if (destroyed) {throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");}if (ref == null) {//4.init();}return ref;
    }
    
    4.
    private void init() {//省略代码。。。构建一个map//创建一个代理类//5.ref = createProxy(map);}
    5.
    private T createProxy(Map<String, String> map) {//是否在jvm中if (shouldJvmRefer(map)) {URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);invoker = REF_PROTOCOL.refer(interfaceClass, url);if (logger.isInfoEnabled()) {logger.info("Using injvm service " + interfaceClass.getName());}} else {urls.clear(); // reference retry init will add url to urls, lead to OOM//点对点,按用户配置直连? 或者按照配置中心的连接if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.String[] us = SEMICOLON_SPLIT_PATTERN.split(url);if (us != null && us.length > 0) {for (String u : us) {URL url = URL.valueOf(u);if (StringUtils.isEmpty(url.getPath())) {url = url.setPath(interfaceName);}// 检测 url 协议是否为 registry,若是,表明用户想使用指定的注册中心if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {// 将 map 转换为查询字符串,并作为 refer 参数的值添加到url 中urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));} else {// 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),// 比如线程池相关配置。并保留服务提供者的部分配置,比如版本, group,时间戳等// 最后将合并后的配置设置为 url 查询字符串中urls.add(ClusterUtils.mergeUrl(url, map));}}}//没有配置url,从注册中心去获得服务地址} else {// if protocols not injvm checkRegistryif (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){checkRegistry();//URL是 registry://List<URL> us = loadRegistries(false);if (CollectionUtils.isNotEmpty(us)) {for (URL u : us) {URL monitorUrl = loadMonitor(u);if (monitorUrl != null) {map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));}urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));}}if (urls.isEmpty()) {throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");}}}//一个服务提供者if (urls.size() == 1) {//构建一个invoker(Protocol)//Protocol$Adaptive, url是registry, //再加上包装类,所以new QosProtocolWrapper(new ProtocolListenerWrapper(new ProtocolFilterWrapper(new RegisterProtol())))//最终是RegisterProtocol.refer()//6.invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));//多个服务提供者,构建集群} else {List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();URL registryURL = null;for (URL url : urls) {invokers.add(REF_PROTOCOL.refer(interfaceClass, url));if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {registryURL = url; // use last registry url}}if (registryURL != null) { // registry url is available// use RegistryAwareCluster only when register's CLUSTER is availableURL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);// The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invokerinvoker = CLUSTER.join(new StaticDirectory(u, invokers));} else { // not a registry url, must be direct invoke.invoker = CLUSTER.join(new StaticDirectory(invokers));}}}if (shouldCheck() && !invoker.isAvailable()) {throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());}if (logger.isInfoEnabled()) {logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());}/*** @since 2.7.0* ServiceData Store*/MetadataReportService metadataReportService = null;if ((metadataReportService = getMetadataReportService()) != null) {URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);metadataReportService.publishConsumer(consumerURL);}// create service proxy//8.return (T) PROXY_FACTORY.getProxy(invoker);
    }
    
    6. RegistryProtocol
    @Override
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {//根据配置获取注册中心地址  zookeeper://url = URLBuilder.from(url).setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)).removeParameter(REGISTRY_KEY).build();//ZookeeperRegisteryRegistry registry = registryFactory.getRegistry(url);//接口是不是RegistryService类型,register://if (RegistryService.class.equals(type)) {return proxyFactory.getInvoker((T) registry, type, url);}//解析group,根据group获取cluster类型  group="a,b" or group="*"Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));String group = qs.get(GROUP_KEY);if (group != null && group.length() > 0) {if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {return doRefer(getMergeableCluster(), registry, type, url);}}//7.return doRefer(cluster, registry, type, url);
    }
    7.  无非就是zk上获取服务地址,netty连接过去,订阅zk上服务节点的变化
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {//创建一个RegistryDirectoryRegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);directory.setRegistry(registry); //registry -> 连接zk的api   ->获得url地址directory.setProtocol(protocol); //protocol -> DubboProtocol()  -> 建立通信// all attributes of REFER_KEYMap<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());//consumer 地址URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));registry.register(directory.getRegisteredConsumerUrl()); //consumer://}directory.buildRouterChain(subscribeUrl);//订阅zk上节点的变化//9.directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));/******************************************///new MockClusterWrapper(new FailoverCluster())Invoker invoker = cluster.join(directory);ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);return invoker;
    }
    8. PROXY_FACTORY 是new StubProxyFactoryWrapper(new JavassistProxyFactory())
    返回一个代理类,最终会执行InvokerInvocationHandler的invoke方法。
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
    9. 主要看下订阅节点的步骤RegistryDirectory对象把zk的节点抽象成目录并发布一个订阅public void subscribe(URL url) {setConsumerUrl(url);CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);serviceConfigurationListener = new ReferenceConfigurationListener(this, url);//registry = ZookeeperRegistry, this = RegistryDirectory//10.registry.subscribe(url, this);
    }
    10. FailBackRegistry
    @Override
    public void subscribe(URL url, NotifyListener listener) {super.subscribe(url, listener);removeFailedSubscribed(url, listener);try {//订阅//11.doSubscribe(url, listener);} catch (Exception e) {Throwable t = e;List<URL> urls = getCacheUrls(url);if (CollectionUtils.isNotEmpty(urls)) {notify(url, listener, urls);logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);} else {// If the startup detection is opened, the Exception is thrown directly.boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)&& url.getParameter(Constants.CHECK_KEY, true);boolean skipFailback = t instanceof SkipFailbackWrapperException;if (check || skipFailback) {if (skipFailback) {t = t.getCause();}throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);} else {logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);}}//失败后续重试addFailedSubscribed(url, listener);}
    }
    
    11.ZookeeperRegistry
    @Override
    public void doSubscribe(final URL url, final NotifyListener listener) {//省略代码。。。List<URL> urls = new ArrayList<>();for (String path : toCategoriesPath(url)) {//缓存操作ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);if (listeners == null) {zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());listeners = zkListeners.get(url);}ChildListener zkListener = listeners.get(listener);if (zkListener == null) {//这里比较重要, 构建一个ChildListener 的匿名内部类 ,在节点变化时候//在他的childChanged()方法执行ZookeeperRegistry.this.notify()方法  重新构建Invokelisteners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));zkListener = listeners.get(listener);}zkClient.create(path, false);//zk上Configuration;consumer ;router ;provider 的子节点List<String> children = zkClient.addChildListener(path, zkListener);if (children != null) {urls.addAll(toUrlsWithEmpty(url, path, children));}}//第一次启动时候构建invoke, Map<String, Invoke>//并建立连接//12.notify(url, listener, urls);}
    
    12.FailBackRegistry
    @Override
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {if (url == null) {throw new IllegalArgumentException("notify url == null");}if (listener == null) {throw new IllegalArgumentException("notify listener == null");}try {//异步通知//13.doNotify(url, listener, urls);} catch (Exception t) {// Record a failed registration request to a failed list, retry regularlyaddFailedNotified(url, listener, urls);logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);}
    }
    
    13.一步步找到 RegistryDirectory
    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {//代码省略//protocol = 各种wrapper后的DubboProtocol//14.invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);}
    
    14.DubboProtocol
    @Override
    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {optimizeSerialization(url);// create rpc invoker.//getClients(url)建立连接//15.DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);invokers.add(invoker);return invoker;
    }
    
    15.
    private ExchangeClient[] getClients(URL url) {// whether to share connectionboolean useShareConnect = false;int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);List<ReferenceCountExchangeClient> shareClients = null;// if not configured, connection is shared, otherwise, one connection for one serviceif (connections == 0) {useShareConnect = true;/*** The xml configuration should have a higher priority than properties.*/String shareConnectionsStr = url.getParameter(Constants.SHARE_CONNECTIONS_KEY, (String) null);connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(Constants.SHARE_CONNECTIONS_KEY,Constants.DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);//默认共享连接,同一个连接通信//16.shareClients = getSharedClient(url, connections);}ExchangeClient[] clients = new ExchangeClient[connections];for (int i = 0; i < clients.length; i++) {if (useShareConnect) {clients[i] = shareClients.get(i);} else {clients[i] = initClient(url);}}return clients;
    }
    
    16.
    private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {String key = url.getAddress();List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);if (checkClientCanUse(clients)) {batchClientRefIncr(clients);return clients;}//并发检查locks.putIfAbsent(key, new Object());synchronized (locks.get(key)) {clients = referenceClientMap.get(key);// dubbo checkif (checkClientCanUse(clients)) {batchClientRefIncr(clients);return clients;}// connectNum must be greater than or equal to 1connectNum = Math.max(connectNum, 1);// If the clients is empty, then the first initialization is//第一次初始化if (CollectionUtils.isEmpty(clients)) {//17.clients = buildReferenceCountExchangeClientList(url, connectNum);referenceClientMap.put(key, clients);} else {for (int i = 0; i < clients.size(); i++) {ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);// If there is a client in the list that is no longer available, create a new one to replace him.if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {clients.set(i, buildReferenceCountExchangeClient(url));continue;}referenceCountExchangeClient.incrementAndGetCount();}}/*** I understand that the purpose of the remove operation here is to avoid the expired url key* always occupying this memory space.*/locks.remove(key);return clients;}
    }
    
    17.
    private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {//构建连接ExchangeClient exchangeClient = initClient(url);return new ReferenceCountExchangeClient(exchangeClient);
    }
    

dubbo核心之消费端的Invoker(五)相关推荐

  1. dubbo核心之服务端的Invoker(四)

    一.服务端Invoker Invoker是一个代理,与dubbo中的SPI一样重要,在ServiceConfig中在调用export方法时会将一个invoker对象传递进去,分析下这个 private ...

  2. dubbo 无法访问消费端_Dubbo最佳实践,我整理了以下9点

    Dubbo服务化,在当前互联网后端开发中,大部分都使用了Dubbo.截止目前github dubbo上,star也将近3万,使用dubbo的公司数量也很可观,Dubbo确实也是一个比较不错的服务化框架 ...

  3. Dubbo学习记录(十七)-服务调用【三】- 服务消费端Invoker的包装

    服务消费端Invoker的包装 服务消费端的Invoker涉及到服务导出流程, 由ReferenceConfigde#get()方法生成一个代理实例Invoker返回: 这次目的的就是 把整个包装链路 ...

  4. dubbo消费端如何找到服务端对象,进行方法调用的

    关于该问题,要从以下几点点出发 消费端如何生成代理对象的 dubbo的ReferenceBean实现了InitializingBean,这是Spring中Bean的生命周期的方法,所以生成代理对象的逻 ...

  5. 源码分析Dubbo服务消费端启动流程

    通过前面文章详解,我们知道Dubbo服务消费者标签dubbo:reference最终会在Spring容器中创建一个对应的ReferenceBean实例,而ReferenceBean实现了Spring生 ...

  6. 一文深入理解Dubbo核心模型Invoker

    本文转载自:一文深入理解Dubbo核心模型Invoker 一.Dubbo中Invoker介绍 为什么说Invoker是Dubbo核心模型呢? Invoker是Dubbo中的实体域,也就是真实存在的.其 ...

  7. dubbo学习总结三 消费端

    消费端跟服务端类似 注意点是dubbo:reference 和服务端的dubbo:service做区分 消费端主要是处理发送过来的请求 转载于:https://www.cnblogs.com/LEEE ...

  8. Dubbo 使用Nacos作为注册中心是,消费端获取不到注册中心服务问题

    1. 问题 服务注册到nacos之后,消费端访问不到注册了的服务. 消费端启动报错如下: org.springframework.beans.factory.BeanCreationException ...

  9. dubbo protocol port 消费者端_springboot整合dubbo设置全局唯一ID进行日志追踪

    击上方蓝色"程序员白楠楠",选择"设为星标" 作者:松下听泉 出处:https://blog.csdn.net/weixin_39427718 1.新建项目 利 ...

最新文章

  1. python可以自学吗-python自学行吗 新手可以自学python吗
  2. linux小知识之终端
  3. 通用大数据架构为什么不适合处理物联网数据?
  4. win10mongodb链接_Windows 10 安装 Mongodb
  5. CUDA——Ubuntu系统上CUDA和cuDNN的安装教程
  6. Linux基本信息查看命令
  7. Adolescent Suicidal Risk Assessment in Clinician-Patient Interaction
  8. 读取Java源文件中字段的注释当做Swagger的字段描述
  9. 生命主题dreamweaver作业静态HTML网页设计——卫生与健康 6页 带视频
  10. 某农商行用户画像项目——模型构建部分
  11. java1.8日期类_JDK1.8-日期使用
  12. matlab数值微分与数值积分
  13. FastQC 配置 及 基本使用
  14. 20201125今日学习
  15. 生物AI插图免费领取
  16. 接入安卓Facebook SDK的AppEvents
  17. 【SLAM】——what(): Pangolin X11: Failed to open X display
  18. 如何将生活中所记录的收入、支出详细收支打印出来
  19. 这可能是最全的计算机编程语言列表了
  20. java的list遍历_【java】list集合遍历的5种方式

热门文章

  1. 如何关闭linux系统的53端口,3种关闭linux系统端口方法
  2. html5实现浏览器自动全屏,[JavaScript] 用html5 js实现浏览器全屏
  3. GitHub使用教程(完整教程)
  4. (2)ITK中迭代器的时间效率
  5. 初学者Nest框架入门学习(一)
  6. 泰国潜水圣地——斯米兰岛近期开放,不去浪还等什么!
  7. LiveGBS国标GB/T28181视频平台支持级联到海康平台大华宇视等第三方国标平台对接政务公安内网国标视频平台
  8. 屏蔽360阻止远程执行变更注册表自启动数据的办法
  9. Lucene使用IKAnalyzer分词实例 及 IKAnalyzer扩展词库
  10. 企业流程优化与IT应用的关系