自定义注解 实现自定义消息

上一次我们学习了java.util.concurrent.Future<T>背后的原理 。 我们还发现, Future<T>通常由库或框架返回。 但是,没有什么可以阻止我们在有意义的情况下自行实现所有功能。 它不是特别复杂,可以显着改善您的设计。 我尽力为我们的示例选择有趣的用例。

JMS(Java消息服务)是用于发送异步消息的标准Java API。 当我们想到JMS时​​,我们立即看到客户端以一发不可收拾的方式向服务器(经纪人)发送消息。 但是在JMS之上实现请求-答复消息传递模式同样普遍。 实现非常简单:您将请求消息(当然是异步地)发送到另一侧的MDB。

MDB处理该请求,然后将答复发送回硬编码的答复队列或客户机选择的任意队列,并与JMSReplyTo属性中的消息一起发送。 第二种情况更有趣。 客户端可以创建一个临时队列,并在发送请求时将其用作回复队列。 这样,每个请求/答复对使用不同的答复队列,因此不需要关联ID,选择器等。

但是有一个问题。 向JMS代理发送消息是简单且异步的。 但是,收到答复要麻烦得多。 您可以实现MessageListener以使用一条消息,也可以使用阻塞MessageConsumer.receive() 。 第一种方法非常重,很难在实践中使用。 第二个失败了异步消息传递的目的。 您还可以按一定间隔轮询回复队列,这听起来更糟。

到现在为止,了解Future抽象您应该有一些设计想法。 如果我们可以发送请求消息并取回Future<T> (代表尚未发送的答复消息)怎么办? Future抽象应该处理所有逻辑,我们可以放心地将其用作未来结果的句柄。 这是用于创建临时队列和发送请求的管道代码:

private <T extends Serializable> Future<T> asynchRequest(ConnectionFactory connectionFactory, Serializable request, String queue) throws JMSException {Connection connection = connectionFactory.createConnection();connection.start();final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);final Queue tempReplyQueue = session.createTemporaryQueue();final ObjectMessage requestMsg = session.createObjectMessage(request);requestMsg.setJMSReplyTo(tempReplyQueue);sendRequest(session.createQueue(queue), session, requestMsg);return new JmsReplyFuture<T>(connection, session, tempReplyQueue);
}

asynchRequest()方法仅将ConnectionFactory带到JMS代理和任意数据。 该对象将使用ObjectMessage发送到queue 。 最后一行至关重要–我们返回自定义的JmsReplyFuture<T> ,它将表示尚未收到的回复。 注意我们如何将临时JMS队列传递给JMSReplyTo属性和Future 。 MDB方面的实现并不那么重要。 不用说是将回复发送回指定队列:

final ObjectMessage reply = session.createObjectMessage(...);
session.createProducer(request.getJMSReplyTo()).send(reply);

因此,让我们深入研究JmsReplyFuture<T> 。 我假设请求和答复都是ObjectMessage 。 使用不同类型的消息不是很困难。 首先让我们看看如何设置从回复通道接收消息:

public class JmsReplyFuture<T extends Serializable> implements Future<T>, MessageListener {//...public JmsReplyFuture(Connection connection, Session session, Queue replyQueue) throws JMSException {this.connection = connection;this.session = session;replyConsumer = session.createConsumer(replyQueue);replyConsumer.setMessageListener(this);}@Overridepublic void onMessage(Message message) {//...}}

如您所见, JmsReplyFuture实现了Future<T> (其中T是包装在ObjectMessage的对象的预期类型)和JMS MessageListener 。 在构造函数中,我们只是开始侦听replyQueue 。 根据我们的设计假设,我们知道那里最多会有一条消息,因为回复队列是临时丢弃队列。 在上一篇文章中,我们了解到Future.get()应该在等待结果时阻塞。 另一方面, onMessage()是从某些内部JMS客户端线程/库调用的回调方法。 显然,我们需要一些共享变量/锁,以使等待中的get()知道答复已到达。 最好我们的解决方案应该是轻量级的,并且不引入任何延迟,因此忙于等待volatile变量是一个坏主意。 最初,我虽然使用了Semaphore ,但我将使用它来从onMessage()取消阻塞get() onMessage() 。 但是我仍然需要一些共享变量来保存实际的回复对象。 因此,我想到了使用ArrayBlockingQueue的想法。 当我们知道不会再有一个项目时,使用队列听起来可能很奇怪。 但是,它利用旧的生产者-消费者模式很好地工作: Future.get()是一个消费者,它阻塞了空队列的poll()方法。 另一方面, onMessage()是生产者,将回复消息放入该队列中并立即取消阻塞消费者。 外观如下:

public class JmsReplyFuture<T extends Serializable> implements Future<T>, MessageListener {private final BlockingQueue<T> reply = new ArrayBlockingQueue<>(1);//...@Overridepublic T get() throws InterruptedException, ExecutionException {return this.reply.take();}@Overridepublic T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {final T replyOrNull = reply.poll(timeout, unit);if (replyOrNull == null) {throw new TimeoutException();}return replyOrNull;}@Overridepublic void onMessage(Message message) {final ObjectMessage objectMessage = (ObjectMessage) message;final Serializable object = objectMessage.getObject();reply.put((T) object);//...}}

实施仍未完成,但涵盖了最重要的概念。 请注意, BlockingQueue.poll(long, TimeUnit)方法非常适合Future.get(long, TimeUnit) 。 不幸的是,即使它们来自相同的程序包并且在相同的时间内或多或少地被开发,一种方法在超时时返回null ,而另一种方法应引发异常。 易于修复。

还要注意onMessage()的实现变得多么容易。 我们只是将新收到的消息放在BlockingQueue reply ,而集合将为我们完成所有同步。 我们仍然缺少一些不太重要但仍然重要的细节–取消和清理。 无需赘述,下面是一个完整的实现:

public class JmsReplyFuture<T extends Serializable> implements Future<T>, MessageListener {private static enum State {WAITING, DONE, CANCELLED}private final Connection connection;private final Session session;private final MessageConsumer replyConsumer;private final BlockingQueue<T> reply = new ArrayBlockingQueue<>(1);private volatile State state = State.WAITING;public JmsReplyFuture(Connection connection, Session session, Queue replyQueue) throws JMSException {this.connection = connection;this.session = session;replyConsumer = session.createConsumer(replyQueue);replyConsumer.setMessageListener(this);}@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {try {state = State.CANCELLED;cleanUp();return true;} catch (JMSException e) {throw Throwables.propagate(e);}}@Overridepublic boolean isCancelled() {return state == State.CANCELLED;}@Overridepublic boolean isDone() {return state == State.DONE;}@Overridepublic T get() throws InterruptedException, ExecutionException {return this.reply.take();}@Overridepublic T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {final T replyOrNull = reply.poll(timeout, unit);if (replyOrNull == null) {throw new TimeoutException();}return replyOrNull;}@Overridepublic void onMessage(Message message) {try {final ObjectMessage objectMessage = (ObjectMessage) message;final Serializable object = objectMessage.getObject();reply.put((T) object);state = State.DONE;cleanUp();} catch (Exception e) {throw Throwables.propagate(e);}}private void cleanUp() throws JMSException {replyConsumer.close();session.close();connection.close();}
}

我使用特殊的State枚举来保存有关状态的信息。 与基于多个标志, null检查等的复杂条件相比,我发现它更具可读性。要记住的第二件事是取消。 幸运的是,它非常简单。 我们基本上关闭了基础会话/连接。 在整个请求/答复消息交换的整个过程中,它必须保持打开状态,否则临时JMS答复队列将消失。 请注意,我们不能轻易通知经纪人/ MDB我们对答复不再感兴趣。 我们只是停止监听它,但是MDB仍将处理请求并尝试将答复发送到不再存在的临时队列。

那么这一切在实践中看起来如何? 假设我们有一个MDB可以接收一个数字并返回一个平方。 假设计算需要一点时间,所以我们提前开始计算,同时做一些工作,然后再取回结果。 这样的设计如下所示:

final Future<Double> replyFuture = asynchRequest(connectionFactory, 7, "square");
//do some more work
final double resp = replyFuture.get();      //49

其中"square"是请求队列的名称。 如果我们重构它并使用依赖注入,我们可以将其进一步简化为:

final Future<Double> replyFuture = calculator.square(7);
//do some more work
final double resp = replyFuture.get();      //49

您知道该设计的最佳选择吗? 即使我们正在利用相当先进的JMS功能,此处也没有JMS代码。 此外,我们稍后可以使用SOAP或GPU将calculator替换为其他实现。 就客户端代码而言,我们仍然使用Future<Double>抽象。 尚未提供计算结果。 根本的机制无关紧要。 那就是抽象之美。

显然,此实现尚未准备好生产(到目前为止)。 但更糟糕的是,它缺少一些基本功能。 我们仍然在某个时候调用阻塞Future.get() 。 而且,无法组成/链接期货(例如, 当响应到达时,发送另一条消息 )或等待最快的期货完成。 耐心一点!

参考: NoBlogDefFound博客中的JCG合作伙伴 Tomasz Nurkiewicz 实现了自定义Future 。

翻译自: https://www.javacodegeeks.com/2013/02/implementing-custom-future.html

自定义注解 实现自定义消息

自定义注解 实现自定义消息_实现自定义的未来相关推荐

  1. java 扫描自定义注解_利用spring 自定义注解扫描 找出使用自定义注解的类

    我们常常有扫描项目里带有指定注解的class, 下面是利用spring扫描自定义注解的方法, 还是比较灵活的 我这里将扫描到的class放到map, 你可以放到其他地方,以便后期使用 import l ...

  2. day05 Spring中自定义注解的用处-之获取自定义的Servie

    PS: 在RPC远程调用中,想要获取自定义的service的方法,就得自定义标签遍历拿到方法 PS:在spring中,两个最核心的 概念是aop和ioc,aop其实就是动态代理. ioc 就是解决对象 ...

  3. @Transactional +自定义注解不生效_技巧分享丨SolidWorks零件自定义属性如何连接到工程图?...

    ​ "3D数据协同挖掘" 小象来了~ 很多工程师给小象留言了--说自己的零件属性填写了,但是在SolidWorks工程图里面无法调用出零件的自定义属性那么我们如何操作才能正确调用出 ...

  4. SentinelResource注解配置下_客户自定义限流处理_削峰填谷_流量控制_速率控制_服务熔断_服务降级---微服务升级_SpringCloud Alibaba工作笔记0047

    技术交流QQ群[JAVA,C++,Python,.NET,BigData,AI]:170933152 上面几节,我们把@SentinelResource说的差不多了,但实际上,sentinel也支持代 ...

  5. SentinelResource注解配置中_客户自定义限流处理_削峰填谷_流量控制_速率控制_服务熔断_服务降级---微服务升级_SpringCloud Alibaba工作笔记0046

    技术交流QQ群[JAVA,C++,Python,.NET,BigData,AI]:170933152 接下来咱们解决上一节说到的问题

  6. ansj 自定义 停用词_构造自定义停用词列表的快速提示

    ansj 自定义 停用词 by Kavita Ganesan 通过Kavita Ganesan 构造自定义停用词列表的快速提示 (Quick tips for constructing custom ...

  7. 如何设置自定义任务栏图标_轻松自定义Windows 7任务栏图标

    如何设置自定义任务栏图标 Would you like to fully customize your Windows 7 icons without having to manually chang ...

  8. 自定义音频播放器_创建自定义HTML5音频播放器

    自定义音频播放器 在本教程中,我将向您介绍HTML5音频,并向您展示如何创建自己的播放器. 如果您想走捷径,请查看Envato市场上可用的现成的HTML5音频播放器 . 它使您可以从各种来源创建播放列 ...

  9. java 自定义注解_两步实现Java自定义注解

    什么是注解? 注解就是为容器提供元数据,例如@Controller 注解则是标记了该Bean需要交给Spring容器进行管理. 那么我们怎么去实现自个的注解,也就是自定义注解呢? 一.自定义一个注解( ...

最新文章

  1. java字符存储_用java的类集框架做一个字符存储器(15)
  2. iphone开发畅销书TOP5(china-pub内部数据)
  3. Python divmod方法
  4. 【Linux系列】【基础版】第二章 文件、目录管理
  5. .net core3.0上传文件出现404_使用FTP代理功能连接空间上传文件(解决改善用户上传慢的问题)...
  6. 计算机考研单科成绩要求,考研单科分数是多少
  7. 码支付(php版本)应用
  8. Kotlin实战【六】Kotlin中集合的创建
  9. android写一个遥控器界面,遥控器界面软件的设计 - 基于安卓系统手机WiFi的家用智能遥控器开发...
  10. 后台管理系统 – 权限管理
  11. 【深度干货】强化学习应用简述
  12. 计算机电路与电子学试卷,电路与电子学(第5版)学习指导与习题解答
  13. Ceph新建monitor或者osd报错:use --overwrite-conf to overwrite
  14. 英雄无敌服务器维护,英雄无敌王国服务器地图攻略
  15. 学习笔记 - 正态分布
  16. android重力传感器横竖反,Android重力传感器--随重力旋转的图标
  17. java assert int_java中的assert(转载)2
  18. Docker安装OnlyOffice并配置自签证书和自己的域名证书
  19. Android字符串格式化
  20. 明朝那些事(三言二拍)

热门文章

  1. springboot整合spring @Cache和Redis
  2. Vue.js2.0开发环境搭建(二)
  3. 一篇文章彻底了解清楚什么是负载均衡
  4. 厉害了,关于String的10道经典面试题
  5. 漫画:什么是动态规划?(整合版)
  6. javaWeb服务详解【客户端调用】(含源代码,测试通过,注释) ——测试
  7. 判断字符串相等能否用==
  8. Typora的使用技巧
  9. C++实现字符串的拼接
  10. spark submit参数及调优