点击关注强哥,还有100多G的面试资料等你来拿

哈喽,大家好,我是强哥。

前些天,一个同事在使用Dubbo的参数回调时,骂骂咧咧的说,Dubbo的这个回调真是奇葩,居然会限制回调次数,自己不得不把callbacks属性值设置的非常大,但是还是会怕服务运行太久后超过回调次数限制,后续的回调就无法正常执行。

突然被他这么一说,我倒是有点奇怪,正常来说Dubbo一个这么牛逼的框架不应该会有这样的限制才对啊。于是,强哥便开始对这个callbacks产生了兴趣,官网和百度找了一番也没有找到具体这个属性的详细解释,哈哈,这不就是又到我亲自研究的时候了吗。

Dubbo参数回调

首先讲一下什么时候Dubbo的参数回调吧,简单的说就是:通过参数回调从服务器端调用客户端逻辑。

参数回调方式与调用本地 callback 或 listener 相同,只需要在 Spring 的配置文件中声明哪个参数是 callback 类型即可。Dubbo 将基于长连接生成反向代理,这样就可以从服务器端调用客户端逻辑。

其实就是说,我们的provider在提供接口的时候,可以设置某个参数是让consumer放回调对象的。当consumer调用provider接口时,provider就可以拿到这个回调对象并进行反向调用(provider回调consumer接口)。

具体配置的代码如下:

<dubbo:service interface="org.apache.dubbo.samples.callback.api.CallbackService" ref="callbackService"connections="1" callbacks="1000"><dubbo:method name="addListener"><dubbo:argument index="1" callback="true"/></dubbo:method>
</dubbo:service>

其中<dubbo:argument index="1" callback="true"/>的index就表示接口CallbackService的第一个参数是用来让consumer传递回调对象的。

callbacks属性

我们从上面的代码中可以看到,在dubbo:service的配置中,有一个属性是callbacks="1000",这个就是上面我同事所谓的“Dubbo对于回调次数的限制”,按他的说法,如果这么配,就相当于回调1000次之后,就会导致Dubbo之后的回调都失败。

这显然不能让强哥相信他理解的正确性,真要是这样,谁还会用Dubbo的回调啊。当然,事实胜于雄辩。直接搞几个测试代码试试就知道了。

测试准备

要说测试,Dubbo这点做得就挺好。在其GitHub仓库的源码目录dubbo-samples下,有参数回调对应的dubbo-samples-callback项目,我们直接就拿这个项目来做测试。

项目结构相对比较简单,具体如下:

这里强哥也要说说项目中使用了内嵌式的ZookeeperEmbeddedZookeeper,不用单独部署ZK,这点确实方便很多,尤其是写这种小示例的时候,太方便了。

不过,我们需要对Dubbo的QoS功能进行关闭,否则在同一台机子上启动多个Dubbo服务会出现端口已被占用的情况。具体需要在callback-provider.xmlcallback-consumer两个文件中添加配置:

<dubbo:application name="callback-provider"><dubbo:parameter key="qos.enable" value="false"/>
</dubbo:application>
<dubbo:application name="callback-consumer"><dubbo:parameter key="qos.enable" value="false"/>
</dubbo:application>

Dubbo的QoS功能是干什么的?

QoS,全称为Quality of Service, 是常见于网络设备中的一个术语 ,例如在路由器中,可以通过Qos动态的调整和控制某些端口的权重,从而优先的保障运行在这些端口上的服务质量。 在Dubbo中,QoS这个概念被用于动态的对服务进行查询和控制。例如对获取当前提供和消费的所有服务,以及对服务进行动态的上下线,即从注册中心上进行注册和反注册操作。

开启该功能需要占用端口,这里为了简化就不配置多个端口了,直接关了就行。

项目中的provider参数回调配置如下:

<dubbo:service interface="org.apache.dubbo.samples.callback.api.CallbackService" ref="callbackService"connections="1" callbacks="1"><dubbo:method name="addListener"><dubbo:argument index="1" callback="true"/></dubbo:method>
</dubbo:service>

注意这里callbacks参数被我修改成了1,也就是说,如果callbacks真的是对回调次数的限制,那么,consumer只要回调一次后,下一次就会失败。

测试

改好配置后,直接开测,首先启动provider,provider在启动的时候会创建CallbackServiceImpl,其代码内容如下:

public class CallbackServiceImpl implements CallbackService {private final Map<String, CallbackListener> listeners = new ConcurrentHashMap<String, CallbackListener>();public CallbackServiceImpl() {Thread t = new Thread(new Runnable() {public void run() {while (true) {try {for (Map.Entry<String, CallbackListener> entry : listeners.entrySet()) {try {entry.getValue().changed(getChanged(entry.getKey()));} catch (Throwable t) {listeners.remove(entry.getKey());}}Thread.sleep(5000); // timely trigger change event} catch (Throwable t) {t.printStackTrace();}}}});t.setDaemon(true);t.start();}public void addListener(String key, CallbackListener listener) {listeners.put(key, listener);listener.changed(getChanged(key)); // send notification for change}private String getChanged(String key) {return "Changed: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());}}

整体就是在创建时,启动一个线程死循环每隔5秒调用一下从consumer加入进来的回调对象的changed方法。我们不会对provider的代码进行修改,所以启动好放着就行。

重点集中在consumer,强哥这里通过几种情况来对consumer进行测试。

情况1

consumer的代码如下:

public class CallbackConsumerBootstrap {public static void main(String[] args) throws Exception {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/callback-consumer.xml"});context.start();CallbackService callbackService = (CallbackService) context.getBean("callbackService");callbackService.addListener("foo.bar1", new CallbackListener() {public void changed(String msg) {System.out.println("callback1:" + msg);}});System.in.read();}}

内容就是获取到provider提供的CallbackService接口实现,然后调用它的addListener方法,addListener方法的第二个参数就是consumer设置的回调对象,示例中使用了匿名内部类new CallbackListener()

我们直接启动,控制台输出内容如下:

callback1:Changed: 2022-04-06 20:40:07
callback1:Changed: 2022-04-06 20:40:08
callback1:Changed: 2022-04-06 20:40:13
callback1:Changed: 2022-04-06 20:40:18
callback1:Changed: 2022-04-06 20:40:23
callback1:Changed: 2022-04-06 20:40:28
callback1:Changed: 2022-04-06 20:40:33
callback1:Changed: 2022-04-06 20:40:38
……

可见,在设置了callbacks="1"后,回调并没有在第一次:"2022-04-07 14:40:07"执行结束后就失败,直接就可以证明callbacks并不是用来限制回调次数的

情况2

callbacks并不是用来限制回调次数的这点证明之后,那么callbacks到底表示的是什么呢?

会不会是限制回调连接的个数呢?我们继续操作:情况1的步骤保持不变,我们简单改下CallbackConsumerBootstrapcallbackService.addListener的第一个参数keyfoo.banir2,然后再起一个连接。结果如下,同时第一个consumer也还是正常输出日志。

callback1:Changed: 2022-04-06 20:25:15
callback1:Changed: 2022-04-06 20:25:19
callback1:Changed: 2022-04-06 20:25:24
callback1:Changed: 2022-04-06 20:25:29
callback1:Changed: 2022-04-06 20:25:34
callback1:Changed: 2022-04-06 20:25:39
callback1:Changed: 2022-04-06 20:25:44
……

连接数2超过了callbacks的个数限制。可见,**callbacks也不是限制回调连接的个数**。

情况3

我们对情况1的代码进行修改,在CallbackConsumerBootstrap中多添加一个回调:

public class CallbackConsumerBootstrap2 {public static void main(String[] args) throws Exception {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/callback-consumer.xml"});context.start();CallbackService callbackService = (CallbackService) context.getBean("callbackService");//第一个callbackService.addListener("foo.bar1", new CallbackListener() {public void changed(String msg) {System.out.println("callback1:" + msg);}});//第二个callbackService.addListener("foo.bar2", new CallbackListener() {public void changed(String msg) {System.out.println("callback2:" + msg);}});System.in.read();}}

关闭旧的consumer,启动当前的CallbackConsumerBootstrap2,启动后代码就报错了:

Caused by: java.lang.IllegalStateException: interface org.apache.dubbo.samples.callback.api.CallbackListener `s callback instances num exceed providers limit :1 ,current num: 2. The new callback service will not work !!! you can cancle the callback service which exported before. channel :NettyChannel [channel=[id: 0xde91033b, L:/10.0.227.75:63364 - R:/10.0.227.75:20880]]at com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.isInstancesOverLimit(CallbackServiceCodec.java:210)at com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.exportOrunexportCallbackService(CallbackServiceCodec.java:107)at com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.encodeInvocationArgument(CallbackServiceCodec.java:255)at com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.encodeRequestData(DubboCodec.java:180)at com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encodeRequest(ExchangeCodec.java:235)at com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encode(ExchangeCodec.java:72)at com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec.encode(DubboCountCodec.java:38)at com.alibaba.dubbo.remoting.transport.netty4.NettyCodecAdapter$InternalEncoder.encode(NettyCodecAdapter.java:70)at io.netty.handler.codec.MessageToByteEncoder.write(MessageToByteEncoder.java:107)... 18 moreat com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:109)at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:244)at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:75)at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52)at com.alibaba.dubbo.common.bytecode.proxy0.addListener(proxy0.java)at org.apache.dubbo.samples.callback.CallbackConsumerBootstrap2.main(CallbackConsumerBootstrap2.java:41)

原因是:callback instances num exceed providers limit :1 ,current num: 2.。也就是说,回调的实例个数2个超过了阈值1个的限制。

由此可见,**callbacks就是用来限制同一个客户端(连接)连接的回调实例个数限制。**

这里需要重点说一下是:

  • 同一个连接

  • 回调实例个数而不是回调个数 怎么证明是实例个数呢?我们接着看。

情况4

还是修改CallbackConsumerBootstrap代码,我们调用两次addListener,但是使用同一个回调实例对象:

public class CallbackConsumerBootstrap3 {public static void main(String[] args) throws Exception {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/callback-consumer.xml"});context.start();CallbackService callbackService = (CallbackService) context.getBean("callbackService");CallbackListener callbackListener = new CallbackListener() {public void changed(String msg) {System.out.println("callback2:" + msg);}};callbackService.addListener("foo.bar1", callbackListener);callbackService.addListener("foo.bar2", callbackListener);System.in.read();}}

运行后,输出如下:

callback2:Changed: 2022-04-06 20:39:47
callback2:Changed: 2022-04-06 20:39:47
callback2:Changed: 2022-04-06 20:39:48
callback2:Changed: 2022-04-06 20:39:48
callback2:Changed: 2022-04-06 20:39:53
callback2:Changed: 2022-04-06 20:39:53
callback2:Changed: 2022-04-06 20:39:58
callback2:Changed: 2022-04-06 20:39:58
callback2:Changed: 2022-04-06 20:40:03
callback2:Changed: 2022-04-06 20:40:03
……

代码没有报错,而且会正常回调两次。由此我们就证明了**callbacks是针对实例的。**

总结+源码分析

综上所述,我们便从4个代码示例中,层层证明,得到: callbacks属性是限制同一个连接的回调实例个数的。而并不是限制consumer的次数。

其实,强哥这次没有从源码的角度来进行分析。主要是为了通过不同的情况,来让大家更好的理解callbacks属性的含义,直接证明,比较一目了然。当然,我们在情况3报错的时候,可以通过错误栈来定位到具体的判断callbacks属性个数的源码位置:CallbackServiceCodecexportOrunexportCallbackService方法:

 if (export) {// one channel can have multiple callback instances, no need to re-export for different instance.if (!channel.hasAttribute(cacheKey)) {if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, false)) {Invoker<?> invoker = proxyFactory.getInvoker(inst, clazz, exporturl);// should destroy resource?Exporter<?> exporter = protocol.export(invoker);// this is used for tracing if instid has published service or not.channel.setAttribute(cacheKey, exporter);logger.info("export a callback service :" + exporturl + ", on " + channel + ", url is: " + url);increaseInstanceCount(channel, countkey);}}}

其中的isInstancesOverLimit方法内容就是判断的地方:

private static boolean isInstancesOverLimit(Channel channel, URL url, String interfaceClass, int instid, boolean isServer) {Integer count = (Integer) channel.getAttribute(isServer ? getServerSideCountKey(channel, interfaceClass) : getClientSideCountKey(interfaceClass));int limit = url.getParameter(Constants.CALLBACK_INSTANCES_LIMIT_KEY, Constants.DEFAULT_CALLBACK_INSTANCES);if (count != null && count >= limit) {//client side errorthrow new IllegalStateException("interface " + interfaceClass + " `s callback instances num exceed providers limit :" + limit+ " ,current num: " + (count + 1) + ". The new callback service will not work !!! you can cancle the callback service which exported before. channel :" + channel);} else {return false;}
}

count为服务端模式的时候,是从getServerSideCountKey中获取的,也就是:

private static String getServerSideCountKey(Channel channel, String interfaceClass) {return Constants.CALLBACK_SERVICE_PROXY_KEY + "." + System.identityHashCode(channel) + "." + interfaceClass + ".COUNT";
}

其中:

  • System.identityHashCode(channel)代表着一个连接

  • interfaceClass就表一个实例 这个的COUNT就是我们说的:每个客户端的一个接口的回调服务实例的个数。

再与callbacks个数比较,来判断是否超过了限制,超过则抛出情况3的异常:

if (count != null && count >= limit) {//client side errorthrow new IllegalStateException("interface " + interfaceClass + " `s callback instances num exceed providers limit :" + limit+ " ,current num: " + (count + 1) + ". The new callback service will not work !!! you can cancle the callback service which exported before. channel :" + channel);
} else {return false;
}

由此,源码也证明结束。

一个小小的属性,也真是要费这么大篇幅来说明。不过强哥也是想让大家能够更好地理解。至少走这么一遍,可以让大家对这个Dubbo的回调机制有个更深入的认识。

小伙伴们如果也想自己试试的话,可以关注「强哥叨逼叨」回复「dubbo回调」获取强哥配置好的测试代码哦。

点击关注强哥,还有100多G的面试资料等你来拿

你该不会也觉得Dubbo参数回调中callbacks属性是用来限制回调次数的吧?相关推荐

  1. loadrunner中的c函数----从参数列表中取参数并与特定字符进行字符串比较。

    今天在www.sztest.net上看到一个关于lr的问题,自己试着解决了一下.主要是想做出一个从参数列表中取参数,并和字符串常量进行比较的功 能.把代码贴到下面.其实是很简单的.问题主要出现在lr_ ...

  2. spring-boot框架dubbo在controlle中r注解@Reference注入service,但是调用方法时候,service报null空指针异常

    spring-boot框架dubbo在controlle中r注解@Reference注入service,但是调用方法时候,service报null空指针异常 参考文章: (1)spring-boot框 ...

  3. 自动装配——@Autowired 构造器,参数,方法,属性都是从容器中获取参数组件的值||自定义组件想要使用Spring容器底层的一些组件 ApplicationContext,BeanFactory

    @Autowired:构造器,参数,方法,属性:都是从容器中获取参数组件的值 * 1).[标注在方法位置]:@Bean+方法参数:参数从容器中获取;默认不写@Autowired效果是一样的:都能自动装 ...

  4. 输入参数_MyBatis中的输入/输出参数

    MyBatis的输入/输出参数 输入参数 MyBatis中的Dao映射配置中的parameter属性表示输入参数类型,它对应Dao接口方法的参数类型有三种: 基本数据类型:如int.char等 < ...

  5. linux线程多参数传递参数,Linux中多线程编程并传递多个参数

    解析Linux中多线程编程并传递多个参数 Linux中多线程编程并传递多个参数实例是本文讲解的内容,不多说,先来看内容. Linux下的多线程编程,并将多个参数传递给线程要执行的函数. 以下是实验程序 ...

  6. android onlescan 参数,Android BLE:从iOS外设广告时,在onLeScan()回调中检索服务UUID

    我正在使用Nexus 4(4.4 kitkat)作为中央和iPad作为外设.外围设备有广告服务.广告包有一些数据(22字节)的服务UUID.当我尝试从 Android扫描外围设备时,iPad外围设备被 ...

  7. 不出现php version网页_php冷知识 - 从命令行参数列表中获取选项

    分享一个php的冷知识 - ,从命令行参数列表中获取选项 用到的函数是getopt 说明 函数签名是这样的 getopt ( string $options [, array $longopts [, ...

  8. JavaScript函数之实际参数对象(arguments) / callee属性 / caller属性 / 递归调用 / 获取函数名称的方法...

    函数的作用域:调用对象 JavaScript中函数的主体是在局部作用域中执行的,该作用域不同于全局作用域.这个新的作用域是通过将调用对象添加到作用域链的头部而创建的(没怎么理解这句话,有理解的亲可以留 ...

  9. url中传递url参数|url中特殊字符、?、=无法解析问题

    url中传递url参数|url中特殊字符&.?.=无法解析问题 1.微信小程序报错:SyntaxError: Unexpected end of JSON input 2.错误场景复现 3.错 ...

最新文章

  1. 污水处理中php是什么药剂,污水处理药剂有哪些?
  2. 财务python招聘_会计、财务、HR等重复性质岗位学习python有什么帮助?
  3. 女生学计算机教学,女生学计算机专业好吗 计算机辅助数学教学之我见
  4. 二叉树的深度优先遍历和广度优先遍历
  5. OpenGL 光照贴图Lighting maps
  6. Promise.all捕获错误
  7. 斗地主AI算法——第十五章の测试模块
  8. C语言表达式5 0的结果是,c语言程序设计期末试题B(含答案)(5页).doc
  9. 使用FileUpload控件上传文件时对文件大小的限制
  10. 统计自然语言处理——信息论基础
  11. mysql查询ip白名单_查询IP白名单
  12. 国外 广告牌_广告牌下一首流行歌曲的分析和预测,第1部分
  13. [week13] 2 - T1
  14. 联想微型计算机拆装图解,联想昭阳e43g拆机教程【详细介绍】
  15. Qlikview脚本生成日历表
  16. ORACLE 19C 单实例数据库安装
  17. 安卓之父安迪·鲁宾:让乔布斯羡慕嫉妒恨的人
  18. 贝叶斯(朴素贝叶斯,正太贝叶斯)及OpenCV源码分析
  19. 入门C语言模板,C语言入门经典-C语言编程
  20. 个人看过较好的电影推荐

热门文章

  1. vue基础指令if和show的区别
  2. Reveal真机配置
  3. 北京中医药大学22春《中医基础理论Z》平时作业1【辅导答案】
  4. woj4764 子矩阵
  5. html前端学习九:web测量工具
  6. 如何用二维码布置作业?文件转二维码在线方法
  7. 蓝牙驱动卸载后自动安装_微软:11月5日起,Win10 PC采用全新自动安装驱动程序方式...
  8. ISO认证咨询解读 ISO9001认证的7大知识点和10大认证误区
  9. Potato提权小结
  10. 计算机组成原理复杂模型机设计思路,计算机组成原理实验报告基本模型机和复杂模型机的设计修订稿-20210605021855.docx-原创力文档...